Golioth makes it easy to upload large data payloads from your IoT devices to the cloud. We do this using the blockwise Stream service, which includes a versatile set of transform and destination options so the data goes where you want, in the format that you need.
Our last post on Blockwise Stream covered the basics of calculating the blocks and sending then one-by-one to the Golioth server. But in the name of simplicity that example used a synchronous API call that blocked program flow until all data had been sent. Today let’s jump in and tackle the topics of proper asynchronous blockwise Stream, including dynamic allocation and accessing data in different ways.
Prerequisites
If you want to give this a try yourself, here’s what you need to have solved before jumping in today:
- Have a Golioth project set up with a Pipeline to receive your Stream data
- Understand the concepts behind sending data block-by-block using the
golioth_stream_set_blockwise_sync()
API call
Both of these are covered in How to Use Golioth Blockwise Stream to Upload Large IoT Data Payloads.
Using a Dynamically Allocated Data Wrapper
When using the async blockwise stream API it will be our responsibility for tracking the upload process through each block. To facilitate this, we can create a new struct in our program.
struct block_stream_container { struct blockwise_transfer *transfer_ctx; uint8_t *data; size_t data_len; uint32_t block_idx; size_t block_size; bool is_last; };
The blockwise_transfer member of this struct is required by the Golioth Firmware SDK when using the async blockwise stream system. We’ll hold onto it alongside a pointer to the data we want to send and the total length of that data. The rest of the struct tracks the progress of the transfer.
A Helper Function to Send Blocks
We need to call the async API once for each block, so let’s set up a function just for this purpose.
enum golioth_status send_next_block(struct block_stream_container *container) { if (NULL == container) { LOG_ERR("Container cannot be NULL"); return GOLIOTH_ERR_NULL; } uint32_t offset = container->block_idx * container->block_size; /* Test to see if this is the final block of data we need to send */ if (container->data_len <= offset + container->block_size) { /* Overwrite the block_size with number of bytes in last block */ container->block_size = container->data_len - offset; /* Indicate to the Golioth Firmware SDK that this is the final block */ container->is_last = true; } LOG_INF("Sending block %u of %zu bytes.%s", container->block_idx, container->block_size, container->is_last ? " Final block." : ""); return golioth_stream_blockwise_set_block_async(container->transfer_ctx, container->block_idx, container->data + offset, container->block_size, container->is_last, on_stream_block_sent, container); }
This function calculates information for sending the next block. We’re not going to go over this part again as it was covered in detail in the previous post. Just remember that we need to calculate the offset for the next block (using that value to read from the correct place in memory). We also need to handle the size and flag when sending the final block.
This function submits each block to the Golioth Firmware SDK using the golioth_stream_blockwise_set_block_async()
API. Note that everything we need for parameters comes from the container struct. Well, everything except the callback which you could store in the struct if you need different callbacks for some types of blockwise stream. The final argument is the pointer to our struct which will be passed to the callback function. This will be important later!
Beginning the Asyncronous Blockwise Stream
We haven’t yet discussed the async callback function, but let’s first look at initiating the blockwise stream as this is where our memory handling adventure begins.
static void send_blockwise_stream(void) { struct block_stream_container *container = k_malloc(sizeof(struct block_stream_container)); if (NULL == container) { LOG_ERR("Failed to allocate container; stream aborted"); return; } container->transfer_ctx = golioth_stream_blockwise_start(client, "my-test-path", GOLIOTH_CONTENT_TYPE_JSON); if (NULL == container->transfer_ctx) { LOG_ERR("Failed to allocate transfer context; stream aborted."); k_free(container); return; } container->data = (uint8_t *)json_data; container->data_len = json_data_len; container->block_idx = 0; container->block_size = CONFIG_GOLIOTH_BLOCKWISE_UPLOAD_MAX_BLOCK_SIZE; container->is_last = 0; enum golioth_status status = send_next_block(container); if (GOLIOTH_OK != status) { LOG_ERR("Failed to start blockwise stream: %d", status); /* Clean up memory before returning */ golioth_stream_blockwise_finish(container->transfer_ctx); k_free(container); return; } }
We begin by dynamically allocating memory for the container struct. This means the struct can outlive this function, remaining available to use as we send each block in the upload. Next we call golioth_stream_blockwise_start()
which takes the Golioth client pointer, the path to which we’re sending data, and the data type.
We will need to remember to call golioth_stream_blockwise_finish()
and k_free()
the context at the end of the stream operation, lest we create a memory leak. Note that we are calling these as part of the error handling in this function. But we will see them again in the next section of the post.
This function sets up everything for the stream operation, starting with a data pointer and length. This assumes the data you wish to send is already in memory and can be directly accessed. But later in the post we’ll discuss other options for accessing block data to send.
Worth noting is the use of CONFIG_GOLIOTH_BLOCKWISE_UPLOAD_MAX_BLOCK_SIZE
as the starting block size. This symbol is provided by the Golioth Firmware SDK and is the upper limit for sending blocks.
With everything configured, the send process begins by calling our send_next_block()
function we discussed in the previous section. As long as that returns GOLIOTH_OK
, program flow can continue and the asynchronous callbacks will handle the rest. Speaking of, we need to define our callback!
Async Callback
The Golioth Firmware SDK will call a user callback at the end of sending each block. This callback receives the success/failure code from the block, and it’s a great way for us to know when the SDK is ready for the next block. This may look like a lot, but it’s mostly error handling.
static void on_stream_block_sent(struct golioth_client *client, enum golioth_status status, const struct golioth_coap_rsp_code *coap_rsp_code, const char *path, size_t block_size, void *arg) { if (NULL == arg) { LOG_ERR("Expected block_stream_container pointer as arg but got NULL"); return; } struct block_stream_container *container = arg; if (GOLIOTH_OK != status) { if (GOLIOTH_ERR_COAP_RESPONSE == status) { LOG_ERR("Blockwise stream failed with CoAP code %d.%02d", coap_rsp_code->code_class, coap_rsp_code->code_detail); } else { LOG_ERR("Blockwise stream failed: %d", status); } goto end_blockwise_stream; } if (true == container->is_last) { LOG_INF("Blockwise stream completed successfully"); goto end_blockwise_stream; } /* Last block completed successfully but there are still more to send */ /* Increment the block number */ container->block_idx += 1; /* Ensure we use the block_size the server wants */ container->block_size = block_size; enum golioth_status err = send_next_block(container); if (GOLIOTH_OK != err) { LOG_ERR("Failed to send next block of blockwise stream: %d", status); goto end_blockwise_stream; } return; end_blockwise_stream: golioth_stream_blockwise_finish(container->transfer_ctx); k_free(container); }
Remember when we passed the pointer to our container struct as a user argument? We get that here in the callback function which can be first checked for null, then cast to our block_stream_container
type.
From there we check the status code that was returned by the Golioth Firmware SDK. This indicates that the block was successfully sent to the cloud (GOLIOTH_OK
) or an error if unsuccessful. Some types of error will return a CoAP error, which explains the complicated logging seen above when an error is received.
Next we check to see if the block we just sent is the last block of the transfer. If so, we can just return without further action. Otherwise now’s the time to increment the block index and call our send_next_block()
function. Note that we always use the block_size
received as a parameter of the callback. In practice this will almost always be the CONFIG_GOLIOTH_BLOCKWISE_UPLOAD_MAX_BLOCK_SIZE
value we used when the blockwise stream was started, it is (very unlikely but) possible the server will request a smaller block size and this is how we handle that case.
If at any point in this callback we encounter an error, or if we’ve already sent the final block, we need to clean up our memory here. The original calling function already went out of scope, so this will be the last time we have a pointer to that memory. We carefully clean up the transfer_context
used by the SDK async blockwise stream APIs, then free the container itself.
Putting It All Together
You can run the example code for yourself. Start with the stream sample from the Golioth Firmware SDK, replacing the main.c
file with the one below. Build, flash, and provision the code following the README in the SDK. The one caveat is that we need to make sure Zephyr includes k_malloc()
. The default for this sample is 0 so add the following Kconfig symbol to your prj.conf file:
CONFIG_HEAP_MEM_POOL_SIZE=128
This is quite a small heap but should make it a quick process to prove you do not have a memory leak.
/* * Copyright (c) 2025 Golioth, Inc. * * SPDX-License-Identifier: Apache-2.0 */ #include "golioth/golioth_status.h" #include <zephyr/logging/log.h> LOG_MODULE_REGISTER(golioth_stream, LOG_LEVEL_DBG); #include <golioth/client.h> #include <golioth/stream.h> #include <samples/common/sample_credentials.h> #include <samples/common/net_connect.h> #include <stdlib.h> #include <string.h> #include <zephyr/kernel.h> static const char json_data[] = "{\"my-quote\":\"1. The Imitation Game\\nI propose to consider the question, ‘Can machines think?’ This should begin with definitions of the meaning of the terms ‘machine’ and ‘think’. The definitions might be framed so as to reflect so far as possible the normal use of the words, but this attitude is dangerous. If the meaning of the words ‘machine’ and ‘think’ are to be found by examining how they are commonly used it is difficult to escape the conclusion that the meaning and the answer to the question, ‘Can machines think?’ is to be sought in a statistical survey such as a Gallup poll. But this is absurd. Instead of attempting such a definition I shall replace the question by another, which is closely related to it and is expressed in relatively unambiguous words.\\nThe new form of the problem can be described in terms of a game which we call the ‘imitation game’. It is played with three people, a man (A), a woman (B), and an interrogator (C) who may be of either sex. The interrogator stays in a room apart from the other two. The object of the game for the interrogator is to determine which of the other two is the man and which is the woman. He knows them by labels X and Y, and at the end of the game he says either ‘X is A and Y is B’ or ‘X is B and Y is A’. The interrogator is allowed to put questions to A and B thus\"}"; static size_t json_data_len = strlen(json_data); struct golioth_client *client; static K_SEM_DEFINE(connected, 0, 1); static void on_client_event(struct golioth_client *client, enum golioth_client_event event, void *arg) { bool is_connected = (event == GOLIOTH_CLIENT_EVENT_CONNECTED); if (is_connected) { k_sem_give(&connected); } LOG_INF("Golioth client %s", is_connected ? "connected" : "disconnected"); } struct block_stream_container { struct blockwise_transfer *transfer_ctx; uint8_t *data; size_t data_len; uint32_t block_idx; size_t block_size; bool is_last; }; /* forward declaration */ enum golioth_status send_next_block(struct block_stream_container *container); static void on_stream_block_sent(struct golioth_client *client, enum golioth_status status, const struct golioth_coap_rsp_code *coap_rsp_code, const char *path, size_t block_size, void *arg) { if (NULL == arg) { LOG_ERR("Expected block_stream_container pointer as arg but got NULL"); return; } struct block_stream_container *container = arg; if (GOLIOTH_OK != status) { if (GOLIOTH_ERR_COAP_RESPONSE == status) { LOG_ERR("Blockwise stream failed with CoAP code %d.%02d", coap_rsp_code->code_class, coap_rsp_code->code_detail); } else { LOG_ERR("Blockwise stream failed: %d", status); } goto end_blockwise_stream; } if (true == container->is_last) { LOG_INF("Blockwise stream completed successfully"); goto end_blockwise_stream; } /* Last block completed successfully but there are still more to send */ /* Increment the block number */ container->block_idx += 1; /* Ensure we use the block_size the server wants */ container->block_size = block_size; enum golioth_status err = send_next_block(container); if (GOLIOTH_OK != err) { LOG_ERR("Failed to send next block of blockwise stream: %d", status); goto end_blockwise_stream; } return; end_blockwise_stream: golioth_stream_blockwise_finish(container->transfer_ctx); k_free(container); } enum golioth_status send_next_block(struct block_stream_container *container) { if (NULL == container) { LOG_ERR("Container cannot be NULL"); return GOLIOTH_ERR_NULL; } uint32_t offset = container->block_idx * container->block_size; /* Test to see if this is the final block of data we need to send */ if (container->data_len <= offset + container->block_size) { /* Overwrite the block_size with number of bytes in last block */ container->block_size = container->data_len - offset; /* Indicate to the Golioth Firmware SDK that this is the final block */ container->is_last = true; } LOG_INF("Sending block %u of %zu bytes.%s", container->block_idx, container->block_size, container->is_last ? " Final block." : ""); return golioth_stream_blockwise_set_block_async(container->transfer_ctx, container->block_idx, container->data + offset, container->block_size, container->is_last, on_stream_block_sent, container); } static void send_blockwise_stream(void) { struct block_stream_container *container = k_malloc(sizeof(struct block_stream_container)); if (NULL == container) { LOG_ERR("Failed to allocate container; stream aborted"); return; } container->transfer_ctx = golioth_stream_blockwise_start(client, "my-test-path", GOLIOTH_CONTENT_TYPE_JSON); if (NULL == container->transfer_ctx) { LOG_ERR("Failed to allocate transfer context; stream aborted."); k_free(container); return; } container->data = (uint8_t *)json_data; container->data_len = json_data_len; container->block_idx = 0; container->block_size = CONFIG_GOLIOTH_BLOCKWISE_UPLOAD_MAX_BLOCK_SIZE; container->is_last = 0; enum golioth_status status = send_next_block(container); if (GOLIOTH_OK != status) { LOG_ERR("Failed to start blockwise stream: %d", status); /* Clean up memory before returning */ golioth_stream_blockwise_finish(container->transfer_ctx); k_free(container); return; } } int main(void) { LOG_DBG("Start Golioth blockwise stream sample"); net_connect(); /* Note: In production, credentials should be saved in secure storage. For * simplicity, we provide a utility that stores credentials as plaintext * settings. */ const struct golioth_client_config *client_config = golioth_sample_credentials_get(); client = golioth_client_create(client_config); golioth_client_register_event_callback(client, on_client_event, NULL); k_sem_take(&connected, K_FOREVER); /* Give network connection logs a few seconds to clear */ k_msleep(2000); int counter = 0; while(counter++ < 5) { send_blockwise_stream(); k_msleep(5000); } return 0; }
When monitoring the device via a serial terminal you will see logging messages that track the state of the blockwise upload:
*** Booting Zephyr OS build v4.1.0 *** *** Golioth Firmware SDK v0.20.0-6-gd6836844668c *** [01:03:50.293,041] <inf> golioth_settings_autoload: Initializing settings subsystem [01:03:50.294,606] <inf> fs_nvs: 8 Sectors of 4096 bytes [01:03:50.294,624] <inf> fs_nvs: alloc wra: 0, f90 [01:03:50.294,631] <inf> fs_nvs: data wra: 0, d9 [01:03:50.294,668] <inf> golioth_settings_autoload: Loading settings [01:03:50.294,873] <dbg> golioth_stream: main: Start Golioth blockwise stream sample [01:03:50.294,913] <inf> golioth_samples: Starting DHCP to obtain IP address [01:03:50.294,996] <inf> golioth_samples: Waiting to obtain IP address [01:03:57.331,030] <inf> net_dhcpv4: Received: 192.168.1.238 [01:03:57.331,490] <inf> golioth_mbox: Mbox created, bufsize: 1320, num_items: 10, item_size: 120 [01:03:57.559,826] <inf> golioth_coap_client_zephyr: Golioth CoAP client connected [01:03:57.559,937] <inf> golioth_stream: Golioth client connected [01:03:57.560,077] <inf> golioth_coap_client_zephyr: Entering CoAP I/O loop [01:03:59.560,104] <inf> golioth_stream: Sending block 0 of 1024 bytes. [01:03:59.602,783] <inf> golioth_stream: Sending block 1 of 349 bytes. Final block. [01:03:59.660,987] <inf> golioth_stream: Blockwise stream completed successfully
And the data can be verified in the LightDB Stream tab of your device on the Golioth web console.
What if Your Data Isn’t In Memory?
Your data may not be in memory that is directly accessible. This is the case when you have stored readings on a filesystem, or when you’re using another library that handles its own memory but provides chunked access to you.
Don’t worry, just update the container struct to store a reference to how this data may be accessed, and add a block buffer when sending the block. (This buffer memory may go out of scope as soon as the Golioth golioth_stream_blockwise_set_block_async()
command returns as the SDK will make a copy of it before the function returns.) While you need to implement this chunk-read function yourself, here’s how our code from above would differ with this approach.
uint8_t block_cache[CONFIG_GOLIOTH_BLOCKWISE_UPLOAD_MAX_BLOCK_SIZE]; enum golioth_status send_next_block(struct block_stream_container *container) { /* unchanged code removed from beginning of function */ read_chunk_from_memory(container->filesystem_uri, offset, container->block_size, block_cache); return golioth_stream_blockwise_set_block_async(container->transfer_ctx, container->block_idx, block_cache, container->block_size, container->is_last, on_stream_block_sent, container); }
Allocating 1024 bytes for a buffer is a lot to put on the heap so here we’ve statically allocated the buffer. It is only accessed from this function.
An Excellent Pipe from Device to Cloud
Getting data from your device fleet to the cloud is something Golioth thinks about all the time. As you’ve seen here, we not only make it easy, but we provide flexibility on both the firmware side and on the cloud side with all the data management tools you will need. Take Golioth for a test drive to day and let us know how it goes by posting to the Golioth forum!
No comments yet! Start the discussion at forum.golioth.io