First Step to Start Tracing with OpenTelemetry on Google Cloud

Masato Naka
10 min readNov 12, 2024

--

## Overview

In this blog post, we will explore how to implement tracing using OpenTelemetry on Google Cloud Platform (GCP). We will focus on using Cloud Run, Cloud Tasks, and Pub/Sub to create a service that demonstrates the tracing capabilities provided by OpenTelemetry. This hands-on guide will help you understand the key components and steps required to set up tracing in your application.

## OpenTelemetry Basics

In this post, we don’t explain details about the OpenTelemetry but I list the main components here for reminder:

  1. exporter: send telemetry data to backend tool (e.g. Jaeger, Datadog, Prometheus, etc)
  2. TracerProvider: create a tracer with configuration (e.g. span processor, id generator, sampler, span limit, etc)
  3. Tracer: creator of Spans
  4. Otel Collector: get telemetry data, process, and export (we don’t use this component in this post)

For more details, please read the official doc.

## Steps

### Prerequisites

Before we begin, make sure to set the following environment variables in your terminal:

export PROJECT=<PROJECT_ID>
export LOCATION_ID=asia-northeast1
export QUEUE_ID=helloworld

Configure your Google Cloud project and compute region:

gcloud config set project $PROJECT
gcloud config set compute/region $LOCATION_ID

### Components

We will create a simple Go application (helloworld) that includes the following endpoints:

1. /helloworld: Returns a simple “Hello, World!” message.
2. /cloudtask: Creates a Cloud Task that send an http request to the /helloworld endpoint above.

Additionally, we will set up:

  • A Pub/Sub topic that triggers /cloudtask endpoint of the Cloud Run service with push subscription.
  • Cloud Tasks that enqueue and invoke the helloworld service.
  • pull subscription in the Cloud Run service.
Example components

### Code & Trace

To ensure that trace information is propagated correctly, use the following code snippets.

Publisher Code Example:

Publish “hello world” message to PubSub topic helloworld.

package main

import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"os"

"cloud.google.com/go/pubsub"
"go.opentelemetry.io/contrib/exporters/autoexport"
"go.opentelemetry.io/contrib/propagators/autoprop"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/api/option"

texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
)

var tracer trace.Tracer

// https://cloud.google.com/stackdriver/docs/instrumentation/setup/go
func setupOpenTelemetry(ctx context.Context) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error

// shutdown combines shutdown functions from multiple OpenTelemetry
// components into a single function.
shutdown = func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

// Configure Context Propagation to use the default W3C traceparent format
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())

// OpenTelemetry Google Cloud Trace Exporter
// https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/main/exporter/trace/README.md
texporter, err := texporter.New(
// Disable spans created by the exporter.
texporter.WithTraceClientOptions(
[]option.ClientOption{option.WithTelemetryDisabled()},
),
)
if err != nil {
log.Fatalf("unable to set up tracing: %v", err)
}

tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(texporter))
shutdownFuncs = append(shutdownFuncs, tp.Shutdown)
otel.SetTracerProvider(tp)

// Finally, set the tracer that can be used for this package.
tracer = tp.Tracer("pubsubpublisher")

return shutdown, nil
}

func main() {
ctx := context.Background()

shutdown, err := setupOpenTelemetry(ctx)
if err != nil {
slog.ErrorContext(ctx, "error setting up OpenTelemetry", slog.Any("error", err))
os.Exit(1)
}
// nolint:errcheck
defer shutdown(ctx)

// custom root span
ctx, span := tracer.Start(ctx, "publish")
defer span.End()

projectId := os.Getenv("PROJECT_ID")
if projectId == "" {
log.Fatal("PROJECT_ID must be set")
}
client, err := pubsub.NewClientWithConfig(ctx, projectId, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
})
if err != nil {
log.Fatal(err)
}
topic := client.Topic("helloworld")

attrs := map[string]string{"Content-Type": "application/json"}
msg := &pubsub.Message{Data: []byte("hello world"), Attributes: attrs}
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(msg.Attributes))

for k, v := range msg.Attributes {
// traceparent is stored in googclient_traceparent
fmt.Printf("msg.attribute key: %s, value: %s\n", k, v)
}

res := topic.Publish(ctx, msg)
id, err := res.Get(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("published %s\n", id)
}

In this code, setupOpenTelemetry setups OpenTelementry. The following code is the setup of the exporter that directly sends telemetry data to the backend tool, which is the GCP Cloud Trace in our case.

import (
...
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
)
...

func setupOpenTelemetry() {
...
texporter, err := texporter.New(
// Disable spans created by the exporter.
texporter.WithTraceClientOptions(
[]option.ClientOption{option.WithTelemetryDisabled()},
),
)
...
}

Then we initialize a TracerProvider that creates a Tracer and register it to the global trace provider.

tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(texporter))
shutdownFuncs = append(shutdownFuncs, tp.Shutdown)
otel.SetTracerProvider(tp)

Finally, a Tracer, which is a creator of Spans, is created and we are ready to create spans.

The following part is important to propagate the span context to the Cloud Run service using TextMapPropagator to set into PubSub message attributes:

attrs := map[string]string{"Content-Type": "application/json"}
msg := &pubsub.Message{Data: []byte("hello world"), Attributes: attrs}
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(msg.Attributes))

The PubSub push subscription must use unwrapped with metadata (ref) to send the traceparent via header, which we’ll see in the section below. The trace context is carried via msg.Attributes and is unwrapped to the header.

HTTP Handler Example

main.go:

// Sample run-helloworld is a minimal Cloud Run service.
package main

import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"

cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

"google.golang.org/api/option"

"go.opentelemetry.io/contrib/exporters/autoexport"
"go.opentelemetry.io/contrib/propagators/autoprop"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric"

texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
)

type server struct {
cli *cloudtasks.Client
queue *queue
}

type queue struct {
name string
url string
}

// var tracer trace.Tracer

func main() {
ctx := context.Background()

// OpenTelemetry setup
shutdown, err := setupOpenTelemetry(ctx)
if err != nil {
log.Fatalln(err)
}
// nolint:errcheck
defer shutdown(ctx)

// Cloud Tasks client
c, err := cloudtasks.NewClient(ctx)
if err != nil {
log.Fatalln(err)
}
defer c.Close()

log.Print("starting server...")
q := NewQueue()
srv := &server{c, q}

// Determine port for HTTP service.
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("defaulting to port %s", port)
}

// pull subscriber
projectID := os.Getenv("PROJECT_ID")
subID := os.Getenv("PUBSUB_SUBSCRIPTION_ID")
if projectID == "" || subID == "" {
fmt.Println("PROJECT_ID and PUBSUB_SUBSCRIPTION_ID must be set")
os.Exit(1)
}
sampleRate := 1.0

go func() {
if err := subscribeOpenTelemetryTracing(
// os.Stdout implements io.Writer
// and is used to write the output of the program.
// This is useful for debugging.
// In production, use a log file or a service like Stackdriver Logging.
// https://cloud.google.com/logging/docs
os.Stdout,
projectID,
subID,
sampleRate,
); err != nil {
fmt.Printf("subscribeOpenTelemetryTracing: %v\n", err)
}
}()

// Start HTTP server.
log.Printf("listening on port %s", port)
if err := http.ListenAndServe(":"+port, srv); err != nil {
log.Fatal(err)
}
}

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router := http.NewServeMux()

router.Handle("/cloudtask", otelhttp.NewHandler(http.HandlerFunc(s.createCloudTaskHandler), "createCloudTaskHandler"))
router.Handle("/helloworld", otelhttp.NewHandler(http.HandlerFunc(helloHandler), "helloHandler"))
router.Handle("/", http.HandlerFunc(helloHandler))
router.ServeHTTP(w, r)
}

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
Message struct {
Data []byte `json:"data,omitempty"`
Attributes map[string]string `json:"attributes"`
ID string `json:"id"`
} `json:"message"`
Subscription string `json:"subscription"`
}

// Create a task in queue.name that sends an HTTP request to queue.url
// This is called by pubsub push subscription
func (s *server) createCloudTaskHandler(w http.ResponseWriter, r *http.Request) {

body, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
log.Printf("io.ReadAll: %v", err)
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
fmt.Printf("body: %s\n", body)

carrier := propagation.HeaderCarrier(r.Header)
ctx := otel.GetTextMapPropagator().Extract(r.Context(), carrier)

if s.queue == nil {
fmt.Println("queue is empty")
fmt.Fprint(w, "skipped creating Cloud Tasks task")
return
}

fmt.Println(r.Header.Get("traceparent"))
fmt.Println(r.Header.Get("tracestate"))
fmt.Println(s.queue.url)
req := &cloudtaskspb.CreateTaskRequest{
Parent: s.queue.name,
Task: &cloudtaskspb.Task{
MessageType: &cloudtaskspb.Task_HttpRequest{
HttpRequest: &cloudtaskspb.HttpRequest{
Url: s.queue.url,
HttpMethod: cloudtaskspb.HttpMethod_GET,
Headers: map[string]string{
"traceparent": r.Header.Get("traceparent"),
"tracestate": r.Header.Get("tracestate"),
},
Body: []byte{},
AuthorizationHeader: nil,
},
},
},
ResponseView: 0,
}
resp, err := s.cli.CreateTask(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Println(resp)
fmt.Fprint(w, "created Cloud Tasks task")
}

func helloHandler(w http.ResponseWriter, r *http.Request) {
name := os.Getenv("NAME")
if name == "" {
name = "World"
}
fmt.Fprintf(w, "Hello %s!\n", name)
}

func NewQueue() *queue {
projectID := os.Getenv("PROJECT_ID")
locationID := os.Getenv("LOCATION_ID")
queueID := os.Getenv("QUEUE_ID")
targetURL := os.Getenv("CLOUD_TASK_TARGET_URL")
if projectID == "" || locationID == "" || queueID == "" || targetURL == "" {
return nil
}
return &queue{
fmt.Sprintf("projects/%s/locations/%s/queues/%s", projectID, locationID, queueID),
targetURL,
}
}

// https://cloud.google.com/stackdriver/docs/instrumentation/setup/go
func setupOpenTelemetry(ctx context.Context) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error

// shutdown combines shutdown functions from multiple OpenTelemetry
// components into a single function.
shutdown = func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

// Configure Context Propagation to use the default W3C traceparent format
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())

// OpenTelemetry Google Cloud Trace Exporter
// https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/main/exporter/trace/README.md
texporter, err := texporter.New(
// Disable spans created by the exporter.
texporter.WithTraceClientOptions(
[]option.ClientOption{option.WithTelemetryDisabled()},
),
)
if err != nil {
log.Fatalf("unable to set up tracing: %v", err)
}

tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(texporter))
shutdownFuncs = append(shutdownFuncs, tp.Shutdown)
otel.SetTracerProvider(tp)

// Configure Metric Export to send metrics as OTLP
mreader, err := autoexport.NewMetricReader(ctx)
if err != nil {
err = errors.Join(err, shutdown(ctx))
return
}
mp := metric.NewMeterProvider(
metric.WithReader(mreader),
)
shutdownFuncs = append(shutdownFuncs, mp.Shutdown)
otel.SetMeterProvider(mp)

return shutdown, nil
}

here we can see the http handler with otelhttp for auto instrumentation:

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router := http.NewServeMux()

router.Handle("/cloudtask", otelhttp.NewHandler(http.HandlerFunc(s.createCloudTaskHandler), "createCloudTaskHandler"))
router.Handle("/helloworld", otelhttp.NewHandler(http.HandlerFunc(helloHandler), "helloHandler"))
router.Handle("/", http.HandlerFunc(helloHandler))
router.ServeHTTP(w, r)
}

We also extract span context from the header to propagate it to the current context:

carrier := propagation.HeaderCarrier(r.Header)
ctx := otel.GetTextMapPropagator().Extract(r.Context(), carrier)

When creating a Cloud Task, the traceparent and tracestate is propagated via header:

req := &cloudtaskspb.CreateTaskRequest{
Parent: s.queue.name,
Task: &cloudtaskspb.Task{
MessageType: &cloudtaskspb.Task_HttpRequest{
HttpRequest: &cloudtaskspb.HttpRequest{
Url: s.queue.url,
HttpMethod: cloudtaskspb.HttpMethod_GET,
Headers: map[string]string{
"traceparent": r.Header.Get("traceparent"),
"tracestate": r.Header.Get("tracestate"),
},
Body: []byte{},
AuthorizationHeader: nil,
},
},
},
ResponseView: 0,
}

The following code is for PubSub pull subscription:

// https://cloud.google.com/pubsub/docs/open-telemetry-tracing
package main

import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"cloud.google.com/go/pubsub"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"google.golang.org/api/option"
)

func subscribeOpenTelemetryTracing(w io.Writer, projectID, subID string) error {
ctx := context.Background()

// TODO: check if tracer provider is necessary
exporter, err := texporter.New(texporter.WithProjectID(projectID),
// Disable spans created by the exporter.
texporter.WithTraceClientOptions(
[]option.ClientOption{option.WithTelemetryDisabled()},
),
)
if err != nil {
return fmt.Errorf("error instantiating exporter: %w", err)
}

resources := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("subscriber"),
)

// Instantiate a tracer provider with the following settings
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resources),
sdktrace.WithSampler(
sdktrace.ParentBased(sdktrace.TraceIDRatioBased(1.0)),
),
)

defer tp.ForceFlush(ctx) // flushes any pending spans
otel.SetTracerProvider(tp)

// Create a new client with tracing enabled.
client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
})
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()

sub := client.Subscription(subID)

// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

var received int32
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
for k, v := range msg.Attributes {
fmt.Fprintf(w, "Attribute: %s = %s\n", k, v)
}
atomic.AddInt32(&received, 1)
msg.Ack()
})
if err != nil {
return fmt.Errorf("sub.Receive: %w", err)
}
fmt.Fprintf(w, "Received %d messages\n", received)

return nil
}

You can see OpenTelementryTracing is enabled in the initialization of the pubsub client:

client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
})

### Deployment

#### Step 1: Create Cloud Tasks Queue

First, we need to create a Cloud Tasks queue:

gcloud tasks queues create $QUEUE_ID --location $LOCATION_ID --project $PROJECT

#### Step 2: Deploy Cloud Run Service

Next, we will build and deploy our Cloud Run service:

1. Build the Docker image:

KO_DOCKER_REPO=$LOCATION_ID-docker.pkg.dev/$PROJECT/cloud-run-source-deploy/helloworld ko build --bare ./helloworld

2. Set the deterministic URL for the Cloud Run service:

PROJECT_NUMBER=$(gcloud projects describe $PROJECT --format="value(projectNumber)")
CLOUD_RUN_URL=https://helloworld-$PROJECT_NUMBER.$LOCATION_ID.run.app

3. Deploy the service:

gcloud run deploy helloworld --image $LOCATION_ID-docker.pkg.dev/$PROJECT/cloud-run-source-deploy/helloworld:latest --set-env-vars=CLOUD_TASK_TARGET_URL=${CLOUD_RUN_URL}/helloworld,PROJECT_ID=$PROJECT,LOCATION_ID=$LOCATION_ID,QUEUE_ID=$QUEUE_ID,PUBSUB_SUBSCRIPTION_ID=helloworld-pull,OTEL_SERVICE_NAME=helloworld --allow-unauthenticated --region $LOCATION_ID --project $PROJECT

You can test the service by accessing its URL:

curl $CLOUD_RUN_URL/helloworld
# Output: Hello World!

#### Step 3: Set Up Pub/Sub

1. Create a Pub/Sub topic:

gcloud pubsub topics create helloworld --project $PROJECT

2. Create a subscription with the necessary options:

gcloud pubsub subscriptions create helloworld — push-endpoint=${CLOUD_RUN_URL}/cloudtask — project $PROJECT — push-no-wrapper — push-no-wrapper-write-metadata

3. Publish a message to the topic:

pubsub push subscription:

gcloud pubsub subscriptions create helloworld --push-no-wrapper --push-no-wrapper-write-metadata --topic helloworld --push-endpoint ${CLOUD_RUN_URL}/cloudtask --project $PROJECT

--push-no-wrapper-write-metadata: this is necessary to propagate traceparent via header (ref)

pubsub pull subscription:

gcloud pubsub subscriptions create helloworld-pull --topic helloworld --project $PROJECT

#### Step 4: Deploy Pub/Sub Publisher Job

Build and deploy the Pub/Sub publisher as a Cloud Run job:

KO_DOCKER_REPO=$LOCATION_ID-docker.pkg.dev/$PROJECT/cloud-run-source-deploy/helloworld-pubsubpublisher ko build — bare ./pubsubpublisher
gcloud run jobs deploy helloworld-pubsubpublisher — image $LOCATION_ID-docker.pkg.dev/$PROJECT/cloud-run-source-deploy/helloworld-pubsubpublisher:latest — set-env-vars=PROJECT_ID=$PROJECT,OTEL_SERVICE_NAME=helloworld-pubsubpublisher — region $LOCATION_ID — project $PROJECT

Execute the job and publish a pubsub message:

gcloud run jobs execute helloworld-pubsubpublisher — region $LOCATION_ID — project $PROJECT

You can check the Trace on Trace Explorer on GCP console:

You can see the connected spans:

  • publish -> helloworld-pull (pull subscription)
  • publish -> /cloudtask -> createCloudTaskHandler (otelhttp) -> google.cloud.tasks.v2.CloudTasks/Create (CloudTasks) -> /helloworld -> helloHandler (otelhtttp)

You also would see some spans that are not connected. e.g. google.pubsub.v1.Publisher/Pubsub:

You can refer the related issue on GitHub: pubsub: Publish RPC traces not not in the same tree #1347. It’s said to be because pubsub publishing is batched asynchronously.

### Cleanup

To clean up the resources created during this tutorial, run the following commands:

1. Delete the Cloud Run service:

gcloud run services delete helloworld — region $LOCATION_ID

2. Delete the Cloud Tasks queue:

gcloud tasks queues delete $QUEUE_ID — location $LOCATION_ID — project $PROJECT

3. Delete the Pub/Sub subscription and topic:

gcloud pubsub subscriptions delete helloworld
gcloud pubsub subscriptions delete helloworld-pull
gcloud pubsub topics delete helloworld

## Summary

In this tutorial, we’ve implemented tracing with OpenTelemetry on Google Cloud using Cloud Run, Cloud Tasks, and Pub/Sub. By following the steps outlined, you should now have a basic understanding of how to set up tracing in your applications, which can greatly assist in monitoring and debugging.

Feel free to explore the references for more in-depth information and code examples.

## References

GitHub

  1. #10709 feat(pubsub): add opentelemetry tracing support
  2. pubsub: extract trace information on push subscriptions #10828 in v1.42.0
  3. propagation/trace_context.go

GCP Cloud Trace with Opentelemetry

  1. OpenTelemetry + Cloud Trace: https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/main/exporter/trace/README.md
  2. Generate traces and metrics with Go: Link trace spans with logs and metrics
  3. Pub/Sub OpenTelemetry tracing: The context propagation mechanism is only enabled when tracing is turned on and is prepended with the googclient_ prefix.
  4. Publish a message with OpenTelemetry tracing enabled
  5. opentelemetry-cloud-run

--

--

Masato Naka
Masato Naka

Written by Masato Naka

An SRE, mainly working on Kubernetes. CKA (Feb 2021). His Interests include Cloud-Native application development, and machine learning.

No responses yet