Today, we’re going to showcase a digital signage demo, using Golioth Connectivity (which enables Bluetooth support). This application also benefits from a small control surface, as in “not a lot of control data is needed to make bigger changes with a device”. This is a microcosm of the larger digital signage industry, which has migrated from simple LED signs up to insanely high resolution screens that shows ads. But the scale from top to bottom requires a lot of the same capabilities and Golioth can help throughout the stack.
Areas that need a high density of low cost nodes are a great fit for Bluetooth demos. The nodes communicate back to the internet through a gateway and each node still gets the capabilities of the Golioth cloud (OTA, settings, data streaming). We’re going to be building with the open source Tikk board to showcase how this might work for a high density traffic area. Let’s look at what we can do:
What’s in the code
The tikk-fleet
project that enables this digital signage demo is based on our Pouch BLE GATT example program. It extends the capabilities by listening for different settings and then acts on those settings using a IS31FL3731 driver for Zephyr. This basically turns settings on the Golioth Cloud into scrolling text or arrows on the device.
For the scrolling text, we explained what that looks like on the last post, so check that out.
For the arrows being shown in this demo, there is a setting handler that looks for one of 4 integer values coming from the cloud. This is used to trigger the directions of the arrows on the IS31FL3731 driver, which ultimately is a set of frames that are stored and replayed to get different motion in the arrows.
static int led_arrow_cb(int new_value) { if (_arrow_dir == new_value) { return 0; } _arrow_dir = new_value; switch (_arrow_dir) { case 0: scroll_arrows(led_dev, false, false); break; case 1: scroll_arrows(led_dev, true, false); break; case 2: scroll_arrows(led_dev, false, true); break; case 3: scroll_arrows(led_dev, true, true); break; default: LOG_WRN("Unsupported scroll settings: %i", new_value); break; } return 0; } GOLIOTH_SETTINGS_HANDLER(ARROW, led_arrow_cb);
Commissioning fleets with certificates
As you’ll see over the coming weeks and months on the Golioth blog, we’re thinking about producing devices for production grade fleets. You may have also noticed that one requirement that is different for Golioth Connectivity (and our new Bluetooth support) is that indirectly connected devices can only validate onto the Golioth platform using certificates. This is different than directly connected devices, which can still prototype using Pre-Shared Keys (PSKs) before going to production with certificates.
For a device like the Tikk, we were able to take the various steps in our Public Key Infrastructure documentation and turn it into a Python script to quickly generate device keys from the Certificate Authority. This is still using a locally generated Root CA–and we highly recommend using a PKI provider to hold your keys–but it lessened the load on generating and pushing the certs onto the Tikk device. Here’s what the Python script looks like:
#!/usr/bin/env python3 import argparse import csv import subprocess import sys from pathlib import Path import shutil import datetime def check_tool(name: str): if shutil.which(name) is None: sys.exit(f"ERROR: Required tool '{name}' not found on PATH.") def run(cmd, cwd=None, dry_run=False): print(">>", " ".join(cmd)) if dry_run: return 0 try: res = subprocess.run(cmd, cwd=cwd, check=True, capture_output=True, text=True) if res.stdout.strip(): print(res.stdout.strip()) if res.stderr.strip(): print(res.stderr.strip()) return res.returncode except subprocess.CalledProcessError as e: print(e.stdout or "", end="") print(e.stderr or "", end="") sys.exit(f"Command failed with exit code {e.returncode}: {' '.join(cmd)}") def make_subject(project_slug: str, cert_id: str) -> str: return f"/O={project_slug}/CN={cert_id}" def prompt_yes_no(prompt: str, default_no=True) -> bool: ans = input(prompt).strip().lower() if ans in ("y", "yes"): return True if ans in ("n", "no"): return False return not default_no # empty -> default to no def prompt_int_with_default(prompt: str, default_val: int) -> int: while True: s = input(f"{prompt} [default {default_val}]: ").strip() if s == "": return default_val try: v = int(s) if v <= 0: print("Please enter a positive integer.") continue return v except ValueError: print("Please enter a valid integer.") def ensure_ca_exists(dry_run: bool): ca_crt = Path("ca.crt.pem") ca_key = Path("ca.key.pem") if ca_crt.exists() and ca_key.exists(): return print("No local CA found (ca.key.pem, ca.crt.pem).") if not prompt_yes_no("Would you like to generate a new Root CA now? [y/N]: "): sys.exit("ERROR: Root CA is required. Provide ca.key.pem and ca.crt.pem or allow generation.") days = prompt_int_with_default("Enter validity period in days for Root CA", 365) print(f"Generating Root CA with validity: {days} days") # Generate CA key run(["openssl", "ecparam", "-name", "prime256v1", "-genkey", "-noout", "-out", "ca.key.pem"], dry_run=dry_run) # Self-signed CA certificate run([ "openssl", "req", "-x509", "-new", "-nodes", "-key", "ca.key.pem", "-sha256", "-subj", "/CN=Root CA", "-days", str(days), "-out", "ca.crt.pem" ], dry_run=dry_run) expiry_date = datetime.datetime.now() + datetime.timedelta(days=days) print(f"Root CA created with validity: {days} days") print(f"Root CA will expire on: {expiry_date.strftime('%Y-%m-%d')}") def provision_one(project_slug: str, cert_id: str, serial_port: str, days: int, mtu: int, outdir: Path, dry_run: bool): device_dir = outdir / cert_id device_dir.mkdir(parents=True, exist_ok=True) # Compute expiry and filenames expiry_date = datetime.datetime.now() + datetime.timedelta(days=days) # Filenames (include cert_id and expiry date: CERTID_YYYY-MM-DD) base_name = f"{cert_id}_{expiry_date.strftime('%Y-%m-%d')}" key_pem = device_dir / f"{base_name}.key.pem" csr_pem = device_dir / f"{base_name}.csr.pem" crt_pem = device_dir / f"{base_name}.crt.pem" key_der = device_dir / f"{base_name}.key.der" crt_der = device_dir / f"{base_name}.crt.der" ca_crt = Path("ca.crt.pem") ca_key = Path("ca.key.pem") if not ca_crt.exists() or not ca_key.exists(): sys.exit("ERROR: ca.crt.pem and/or ca.key.pem not found in current directory.") print(f"\n=== Provisioning {cert_id} on {serial_port} ===") print(f"Output dir: {device_dir}") print(f"Certificate validity: {days} days") print(f"Certificate will expire on: {expiry_date.strftime('%Y-%m-%d')}") # 1) EC private key if not key_pem.exists(): run(["openssl", "ecparam", "-name", "prime256v1", "-genkey", "-noout", "-out", str(key_pem)], dry_run=dry_run) else: print(f"Skip key (exists): {key_pem}") # 2) CSR subj = make_subject(project_slug, cert_id) run(["openssl", "req", "-new", "-key", str(key_pem), "-subj", subj, "-out", str(csr_pem)], dry_run=dry_run) # 3) Sign certificate run([ "openssl", "x509", "-req", "-in", str(csr_pem), "-CA", str(ca_crt), "-CAkey", str(ca_key), "-CAcreateserial", "-out", str(crt_pem), "-days", str(days), "-sha256" ], dry_run=dry_run) # 4) Export DER run(["openssl", "ec", "-in", str(key_pem), "-outform", "DER", "-out", str(key_der)], dry_run=dry_run) run(["openssl", "x509", "-in", str(crt_pem), "-outform", "DER", "-out", str(crt_der)], dry_run=dry_run) # 5) Upload via smpmgr run(["smpmgr", "--port", serial_port, "--mtu", str(mtu), "file", "upload", str(key_der), "/lfs1/credentials/key.der"], dry_run=dry_run) run(["smpmgr", "--port", serial_port, "--mtu", str(mtu), "file", "upload", str(crt_der), "/lfs1/credentials/crt.der"], dry_run=dry_run) print(f"Done: {cert_id} ✔") def parse_csv(csv_path: Path): items = [] with csv_path.open(newline="") as f: reader = csv.DictReader(f) required = {"cert_id", "serial_port"} if not required.issubset(reader.fieldnames or {}): sys.exit(f"ERROR: CSV must have headers: {', '.join(sorted(required))}") for row in reader: cert_id = row["cert_id"].strip() serial_port = row["serial_port"].strip() if not cert_id or not serial_port: print(f"WARNING: Skipping row with missing cert_id or serial_port: {row}") continue items.append((cert_id, serial_port)) return items def main(): ap = argparse.ArgumentParser( description=( "Provision devices with OpenSSL + smpmgr uploads.\n\n" "Files are written under ./provisioned/<CERT_ID>/ using names like\n" "<CERT_ID>_YYYY-MM-DD.key.pem / .csr.pem / .crt.pem / .key.der / .crt.der\n\n" "If run with only --project-slug, the script will check/generate a Root CA and then exit with examples." ) ) ap.add_argument("--project-slug", required=True, help="Organization (O=) field for the certificate subject.") ap.add_argument("--cert-id", help="Device certificate ID (CN=) for single-device mode.") ap.add_argument("--serial-port", help="Serial port for single-device mode (e.g. /dev/ttyACM0, COM5).") ap.add_argument("--csv", type=Path, help="CSV with headers: cert_id,serial_port for batch mode.") ap.add_argument("--days", type=int, default=365, help="Certificate validity in days (default: 365).") ap.add_argument("--mtu", type=int, default=128, help="smpmgr MTU (default: 128).") ap.add_argument("--outdir", type=Path, default=Path("provisioned"), help="Base output directory (default: ./provisioned).") ap.add_argument("--dry-run", action="store_true", help="Print commands without executing.") args = ap.parse_args() check_tool("openssl") check_tool("smpmgr") # Ensure local Root CA exists or offer to generate one ensure_ca_exists(args.dry_run) # If no devices/CSV provided, cleanly exit after CA prep with next-step examples if not args.csv and (not args.cert_id or not args.serial_port): print("\nRoot CA is present and ready.") print("CA files: ca.key.pem, ca.crt.pem") print("\nNext, provision devices in one of two ways:\n") print("Single device:") print(f" python3 provision_devices.py \\\n --project-slug {args.project_slug} \\\n --cert-id DEVICE123 \\\n --serial-port /dev/ttyACM0") print("\nBatch (CSV with headers cert_id,serial_port):") print(f" python3 provision_devices.py \\\n --project-slug {args.project_slug} \\\n --csv devices.csv") sys.exit(0) items = [] if args.csv: items.extend(parse_csv(args.csv)) if args.cert_id and args.serial_port: items.append((args.cert_id, args.serial_port)) timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") print(f"Starting provisioning at {timestamp}") print(f"Project slug: {args.project_slug}") print(f"Devices: {len(items)}") args.outdir.mkdir(parents=True, exist_ok=True) for cert_id, serial_port in items: provision_one(args.project_slug, cert_id, serial_port, args.days, args.mtu, args.outdir, args.dry_run) print("\nAll done. 🎉") if __name__ == "__main__": main()
How this could be modified
While there aren’t a ton of direct commercial use cases for a 25×15 mm LED sign, the idea is the same throughout the industry. Many devices could take our Settings service and apply it to portions of their fleet. As Golioth Connectivity continues to expand, we’ll be looking at other transport mechanisms to get the same encrypted data over other channels to devices (think CAN, LoRa, or even serial).
One important distinction is that each device is directly communicating back through the cloud through a gateway. Effectively the Bluetooth device using Pouch is an internet connected device…just using an additional hop. This is different than the behavior that might be expected for updating a range of signs in a small area all at once. We have shown a BLE Mesh demo in the past that achieves that goal, but has many other challenges, including not actually being enabled by Golioth Connectivity (that demo far preceded our announcement). If we wanted similar behavior on this setup, we could instead implement a schedule, upon which all signs would change. But that requires additional features that deliver time to each device. If you’re interested in time-based updates, please reach out!
We’ll continue to have demos using the Tikk board and Golioth Connectivity. If you have a use case you’d like us to take on, please drop a note on our forum!
No comments yet! Start the discussion at forum.golioth.io