A CSV drops into an S3 bucket, a Go worker picks it up, and twenty minutes later the pod is OOM-killed. The file was 8 GB. The pipeline read the whole thing into a slice before the first transformer ran, fanned out one goroutine per row, and watched the scheduler thrash while memory climbed past the limit. The fix wasn’t a bigger pod — it was a pipeline that streamed and bounded its own concurrency.
That’s the class of bug this post is about. Go is a great language for ETL — cheap concurrency, natural backpressure through channels, a static binary that deploys anywhere — but the defaults that make it easy to prototype make it easy to blow up under load. Unbounded goroutines. io.ReadAll on a response body. A retry loop with no circuit breaker. An infinite channel feeding a slow loader.
I’ve built ETL in Go for terabytes-per-day workloads and torn down enough of my own messes to have opinions about the shape of a pipeline that survives production. This post walks the failure model first, then the patterns that defend against each failure, with Go code you can adapt. Not a drop-in library — a shape. Build your own from these patterns and you’ll understand your pipeline. Copy-paste without understanding and you’ll get paged.
Failure Model First
Before any code, name the failures. Every pattern below defends against something specific. If you can’t name the failure, drop the pattern.
| Failure | Defense |
|---|---|
| Memory blowout on large inputs | Stream, never buffer entire inputs in memory. Bounded channels. |
| Goroutine leak on cancellation | Every send/receive in a select with ctx.Done(). Close channels from the sender side. |
| Unbounded goroutine spawn | Fixed-size worker pool. Semaphore or errgroup.SetLimit. |
| Slow downstream stalls upstream | Buffered channels sized deliberately. Backpressure is a feature. |
| Poison record crashes the stage | Convert panics to error records. Dead-letter queue for failures. |
| Retry storms against a dead dependency | Circuit breaker around retry. Exponential backoff with jitter. |
| Partial failure hidden by “success” | Per-stage counters. Fail-fast vs collect-and-continue is an explicit choice. |
| Re-run double-writes or drops data | Idempotent loaders. Watermarks or dedup keys for replay. |
| Secrets in logs | Structured logs with redaction at the sink. Never log the raw record. |
| Resource leak on early exit | defer close on every file, connection, HTTP body, transaction. |
I’ll reference this table throughout. If a control isn’t defending against a row here, it probably isn’t worth adding.
Streaming vs Batching: The First Decision
Before writing a pipeline, answer one question: does a single input fit comfortably in memory, or not?
- Fits in memory (a few MB, or a dataset whose size you control): batching is fine. Read it, transform the slice, write it. Simpler code, fewer goroutines, easier to reason about.
- Doesn’t fit, or you don’t know the bound (files from users, API responses, Kafka topics, database tables of unknown size): stream. Read one record at a time, move it through the pipeline, never hold the whole dataset.
Most production ETL falls in the second bucket even when you think it’s the first. A 200 MB file from one customer becomes a 20 GB file from another. An API that returns 500 rows today returns 500,000 after a backfill. Default to streaming. Write the streaming version first, and only collapse to batching if the data is genuinely bounded and small.
Streaming in Go means each stage is a function that consumes a channel of records and produces a channel of records. The pipeline is the composition. Backpressure happens naturally: a slow loader blocks on its input channel, which blocks the transformer’s send, which blocks the extractor. No backpressure code to write — the channels are the backpressure.
But that only holds if the channels are bounded. An unbuffered or small-buffered channel gives you real backpressure. A large buffer or an unbounded queue gives you a memory leak with extra steps.
The Shape of a Stage
Every stage shares a signature: take a context and an input channel, return an output channel that closes when input closes or context cancels. One rule above all: the stage that creates a channel is the only stage allowed to close it. Close from the sender side, always, or you race a send against a close and panic.
package etl
import "context"
// Record is the unit flowing through the pipeline. Keep it narrow — every
// field is copied at every stage. If you need a large payload, carry a
// reference (path, URL, offset) and fetch lazily at the stage that needs it.
type Record struct {
ID string
Source string
Data map[string]any
Err error // non-nil means poison — the record failed earlier
}
// Stage is the canonical stage signature. In is nil for extractors.
type Stage func(ctx context.Context, in <-chan Record) <-chan Record
I used to design this with separate Extractor, Transformer, Loader interfaces. I stopped. One signature composes more cleanly, tests identically, and removes the ceremony of explaining why an extractor’s Extract returns (chan, error) while the others return chan. Errors at construction go in the constructor; errors at runtime go in the record.
Bounded Concurrency Everywhere
The most common ETL bug I see in Go is a stage that spawns one goroutine per record. It looks cheap — goroutines are a few KB — until your CSV has 20 million rows and the runtime is managing 20 million stacks. Scheduler thrash, GC pressure, memory climbing until the OOM killer arrives.
The fix is a fixed-size worker pool. Spawn N workers once, have them read from a shared channel, done. N is a capacity number you choose deliberately based on the stage’s work — CPU-bound transform? GOMAXPROCS. I/O-bound loader? Higher, sized by the downstream’s concurrency limit.
Here’s a CSV extractor that streams and bounds. It reads rows serially (the csv.Reader is not concurrency-safe anyway), fans them out to a fixed worker pool for parsing, and every send is guarded by ctx.Done():
package etl
import (
"context"
"encoding/csv"
"fmt"
"io"
"os"
"sync"
)
type CSVExtractor struct {
Path string
HasHeaders bool
Workers int // fixed pool size — DO NOT scale with row count
BufferSize int // output channel capacity, sized for backpressure
}
func (e *CSVExtractor) Extract(ctx context.Context) <-chan Record {
out := make(chan Record, e.BufferSize)
go func() {
defer close(out)
f, err := os.Open(e.Path)
if err != nil {
sendOrDrop(ctx, out, Record{Err: fmt.Errorf("open: %w", err)})
return
}
defer f.Close()
r := csv.NewReader(f)
r.ReuseRecord = true // avoid per-row allocation
var headers []string
if e.HasHeaders {
row, err := r.Read()
if err != nil {
sendOrDrop(ctx, out, Record{Err: fmt.Errorf("headers: %w", err)})
return
}
headers = append([]string(nil), row...) // copy: ReuseRecord reuses the slice
}
// Unbuffered hand-off channel. The reader blocks until a worker
// is ready. That blocking IS the backpressure.
rows := make(chan []string)
var wg sync.WaitGroup
for i := 0; i < e.Workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for row := range rows {
rec := toRecord(e.Path, headers, row)
select {
case <-ctx.Done():
return
case out <- rec:
}
}
}()
}
// Reader loop: single producer, fans out to workers.
for {
// Check cancel BEFORE blocking on Read.
if ctx.Err() != nil {
break
}
row, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
sendOrDrop(ctx, out, Record{Err: fmt.Errorf("read row: %w", err)})
break
}
// Copy before handoff — ReuseRecord means the next Read overwrites row.
rowCopy := append([]string(nil), row...)
select {
case <-ctx.Done():
// Closing rows unblocks any worker parked on range.
close(rows)
wg.Wait()
return
case rows <- rowCopy:
}
}
close(rows)
wg.Wait()
}()
return out
}
// sendOrDrop sends on out, or returns if ctx is canceled. Never blocks forever.
func sendOrDrop(ctx context.Context, out chan<- Record, r Record) {
select {
case <-ctx.Done():
case out <- r:
}
}
func toRecord(src string, headers, row []string) Record {
data := make(map[string]any, len(row))
if len(headers) > 0 {
for i, v := range row {
if i < len(headers) {
data[headers[i]] = v
}
}
} else {
for i, v := range row {
data[fmt.Sprintf("col_%d", i)] = v
}
}
return Record{Source: src, Data: data}
}
Three things to internalize from this:
- The worker pool is fixed. It doesn’t grow with input size. A 20 GB CSV uses the same
Workersgoroutines as a 20 MB one. Memory cost of concurrency is decoupled from data volume. - Every
case <-ctx.Done()is there to prevent a goroutine leak, not just to abort work. A worker stuck trying to send on a fulloutchannel that nobody is reading is a leaked goroutine. Context cancel unblocks it. - The reader must check
ctx.Err()before callingRead()— otherwise a canceled pipeline keeps reading the file until EOF before noticing, which on an 8 GB file is a lot of wasted IO.
When to use this: any streaming source where you parse records one-at-a-time. CSV, JSONL, Parquet row-group iteration, line-oriented logs.
When not to use this: if your input is 200 rows of config data, just read it into a slice. The worker pool is overkill for bounded-tiny inputs.
Streaming the API Extractor
The API extractor is where I see io.ReadAll(resp.Body) show up most. Innocuous in unit tests. In production, your provider returns a 2 GB paginated blob in one response and your pod dies.
Two defenses, both of which you want:
- Stream the body with
json.Decoder. Decode one element at a time off the wire, don’t materialize the whole response. - Paginate. If the API supports cursors or page tokens, use them. A single request should never represent “all the data.”
Here’s the streaming, paginating version:
package etl
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
)
type APIExtractor struct {
BaseURL string
Client *http.Client // shared, with timeouts configured
BufferSize int
PageParam string // e.g., "cursor" or "page_token"
MaxBodyBytes int64 // hard cap on response size per page; 0 means a safe default
}
// apiPage is the generic paginated response shape. Adapt to your API.
type apiPage struct {
Items []json.RawMessage `json:"items"`
NextPage string `json:"next_page"`
}
func (e *APIExtractor) Extract(ctx context.Context) <-chan Record {
out := make(chan Record, e.BufferSize)
go func() {
defer close(out)
cursor := ""
for {
if err := ctx.Err(); err != nil {
return
}
page, err := e.fetchPage(ctx, cursor)
if err != nil {
// Do NOT log the raw cursor or its length: page tokens are
// opaque and may encode session/auth context, and raw length
// can still leak structure across correlated requests. Emit
// an 8-char SHA-256 prefix when present, "empty" otherwise.
sendOrDrop(ctx, out, Record{Err: fmt.Errorf("fetch page (%s): %w", cursorTag(cursor), err)})
return
}
for _, raw := range page.Items {
rec := Record{Source: e.BaseURL, Data: map[string]any{"raw": raw}}
select {
case <-ctx.Done():
return
case out <- rec:
}
}
if page.NextPage == "" {
return
}
cursor = page.NextPage
}
}()
return out
}
func (e *APIExtractor) fetchPage(ctx context.Context, cursor string) (*apiPage, error) {
u, err := url.Parse(e.BaseURL)
if err != nil {
return nil, err
}
if cursor != "" {
q := u.Query()
q.Set(e.PageParam, cursor)
u.RawQuery = q.Encode()
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
resp, err := e.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("status %d", resp.StatusCode)
}
// Cap the body. A hostile or broken server can stream forever, and
// json.Decoder will happily grow the target until the pod is OOM-killed.
// http.MaxBytesReader enforces the limit DURING the read: once the cap
// is hit, Decode returns a *http.MaxBytesError mid-stream instead of
// completing with an oversized struct already materialized.
//
// Do NOT "probe for overflow" by wrapping io.LimitReader and calling
// io.Copy after Decode — json.Decoder has an internal buffer, so by the
// time the probe runs the oversized body is already in memory. The cap
// has to be enforced BEFORE the decoder reads.
limit := e.MaxBodyBytes
if limit <= 0 {
limit = 32 << 20 // 32 MiB default per page
}
resp.Body = http.MaxBytesReader(nil, resp.Body, limit)
var page apiPage
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
return nil, fmt.Errorf("response exceeds %d bytes", limit)
}
return nil, fmt.Errorf("decode: %w", err)
}
return &page, nil
}
// cursorTag returns a bounded, non-reversible marker for logs. Never log the
// raw cursor — it may carry session state.
func cursorTag(cursor string) string {
if cursor == "" {
return "cursor=empty"
}
sum := sha256.Sum256([]byte(cursor))
return "cursor=" + hex.EncodeToString(sum[:4]) // 8 hex chars
}
Note what’s not here: no io.ReadAll, no in-memory accumulation across pages, no goroutine fan-out before we even have records. One HTTP request at a time, size-capped at the transport, decoded into a single page, then pushed to the output channel under backpressure.
Be honest about the streaming boundary. json.NewDecoder(body).Decode(&page) reads until the top-level object closes and materializes every Items[i] into the struct — it’s streamed off the socket but buffered in the struct. That’s fine for bounded pages. It is not streaming-per-item. If a single page can be multi-gigabyte you need decoder.Token() to walk the JSON stream and emit each array element individually. For the paginated REST shape, per-page bounding (via io.LimitReader) is the right cut.
The HTTP client is passed in, not constructed inside. That’s deliberate — a real http.Client needs configured Timeout, connection pool sizing, and TLS — all things that belong in the caller’s wiring, not buried in an extractor constructor.
When to use this: any paginated REST API, especially ones where you don’t control the response size.
When not to use this: streaming providers (Kafka, Pub/Sub, Kinesis) have their own client libraries that handle paging and backpressure natively. Use those. Don’t reimplement with HTTP.
Transformers: Keep Them Pure, Bound the Fan-Out
A transformer stage maps records. The temptation is to spawn a goroutine per record for parallelism. Don’t. Same rule as the extractor: fixed-size worker pool, fan-in to a shared output channel.
The cleanest implementation I’ve found uses errgroup with SetLimit:
package etl
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
)
// safeCall invokes fn and converts a panic into an error record. The
// failure-model table promises "convert panics to error records" — this is
// the single choke point that makes that promise true. Without it, one bad
// transform crashes the whole worker and eventually the process.
func safeCall(
ctx context.Context,
r Record,
fn func(context.Context, Record) (Record, error),
) (out Record, err error) {
defer func() {
if p := recover(); p != nil {
out = Record{ID: r.ID, Source: r.Source}
err = fmt.Errorf("panic in transform: %v", p)
}
}()
return fn(ctx, r)
}
// ParallelMap runs fn on each input record with a bounded worker pool.
// fn must be safe to call concurrently. Returning an error from fn marks
// the record as poison (Err set) — it does NOT cancel the group. A panic
// in fn is converted to a poison record via safeCall; it does not crash
// the process or wedge the stage.
func ParallelMap(
ctx context.Context,
in <-chan Record,
workers, bufferSize int,
fn func(context.Context, Record) (Record, error),
) <-chan Record {
out := make(chan Record, bufferSize)
go func() {
defer close(out)
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(workers) // BOUND. This is the whole point.
for rec := range in {
// Pass-through records that are already poison.
if rec.Err != nil {
select {
case <-gctx.Done():
return
case out <- rec:
}
continue
}
rec := rec // capture
g.Go(func() error {
result, err := safeCall(gctx, rec, fn)
if err != nil {
result = Record{ID: rec.ID, Source: rec.Source, Err: err}
}
select {
case <-gctx.Done():
return gctx.Err()
case out <- result:
return nil
}
})
}
_ = g.Wait()
}()
return out
}
Two decisions worth naming:
errgroup.SetLimit(workers)blocksg.Gowhen the pool is full. That’s what makes this bounded — the for-range loop overinpauses atg.Go, which back-pressures up the pipeline.- A failed
fndoes not cancel the group. This is the collect-and-continue policy: bad records become poison records and flow downstream for dead-letter handling. Alternative (fail-fast) would return the error fromg.Goand cancel the group — correct for some pipelines, wrong for most ETL. Name the policy explicitly.
Retry With a Circuit Breaker, Not Without
Transformers that call external services (enrichment APIs, geocoders, ML inference) need retry. Retry alone is wrong. If the downstream is dead, a naive retry loop turns a dead dependency into a DDoS against itself — every record retries 3× with backoff, you have N workers doing this, and when the service comes back up it immediately drowns.
The pattern is retry inside a circuit breaker. Retry handles transient blips; the breaker handles sustained outage by short-circuiting requests for a cooldown period.
package etl
import (
"context"
"errors"
"math/rand/v2"
"sync"
"time"
)
// Breaker states: closed (normal), open (short-circuit), half-open (one probe in flight).
type breakerState int
const (
stateClosed breakerState = iota
stateOpen
stateHalfOpen
)
type Breaker struct {
mu sync.Mutex
state breakerState
failures int
openedAt time.Time
Threshold int // consecutive failures to open
CooldownFor time.Duration // how long to stay open before a probe
}
var (
ErrBreakerOpen = errors.New("circuit breaker open")
// errPanic is a belt-and-suspenders sentinel. safeCall already converts
// panics inside fn into error records, so the user-supplied function
// can't wedge the breaker on its own. errPanic covers the narrower
// case where safeCall's own wrapper (or the framework code around it)
// panics before fn even runs: the outer deferred Record() still fires
// with a non-nil error instead of leaving the half-open state stuck.
errPanic = errors.New("panic in fn")
)
// NewBreaker builds a breaker with sane defaults. A zero-value Breaker{}
// has Threshold=0 and CooldownFor=0, which means "trip on the first error
// and never recover" — almost never what the caller intended. Prefer this
// constructor; it rejects zero/negative values and makes intent explicit.
func NewBreaker(threshold int, cooldown time.Duration) *Breaker {
if threshold <= 0 {
threshold = 5
}
if cooldown <= 0 {
cooldown = 30 * time.Second
}
return &Breaker{Threshold: threshold, CooldownFor: cooldown}
}
// Allow reserves a slot. Returns true if the caller may proceed.
// In half-open, admits exactly one probe — all other callers see false until
// the probe's Record() resolves the state.
func (b *Breaker) Allow() bool {
b.mu.Lock()
defer b.mu.Unlock()
// Defensive defaults for zero-value Breaker{}. Prefer NewBreaker, but
// don't make a copy-paste Breaker{} trip on the first error.
if b.Threshold <= 0 {
b.Threshold = 5
}
if b.CooldownFor <= 0 {
b.CooldownFor = 30 * time.Second
}
switch b.state {
case stateClosed:
return true
case stateOpen:
if time.Since(b.openedAt) <= b.CooldownFor {
return false
}
// Cooldown elapsed: promote to half-open and admit this caller as the probe.
b.state = stateHalfOpen
return true
case stateHalfOpen:
// A probe is already in flight. Everyone else waits.
return false
}
return false
}
func (b *Breaker) Record(err error) {
b.mu.Lock()
defer b.mu.Unlock()
if err == nil {
b.state = stateClosed
b.failures = 0
return
}
switch b.state {
case stateHalfOpen:
// Probe failed. Re-open with a fresh cooldown.
b.state = stateOpen
b.openedAt = time.Now()
case stateClosed:
b.failures++
if b.failures >= b.Threshold {
b.state = stateOpen
b.openedAt = time.Now()
}
}
}
// IsRetryable decides whether an error justifies another attempt. Callers
// should override for their error taxonomy (e.g. don't retry 4xx, context
// cancel, or domain errors you never want to retry).
type IsRetryable func(error) bool
// WithRetry wraps fn with bounded retry + circuit breaker.
// Max attempts, exponential backoff with full jitter, honors context cancel.
// If retryable is nil, every non-nil error is retried (up to maxAttempts).
func WithRetry(
b *Breaker,
maxAttempts int,
retryable IsRetryable,
fn func(context.Context, Record) (Record, error),
) func(context.Context, Record) (Record, error) {
return func(ctx context.Context, r Record) (Record, error) {
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
if !b.Allow() {
return r, ErrBreakerOpen
}
// Allow() may have promoted the breaker to half-open. If fn
// panics before Record() runs, the breaker would wedge — every
// subsequent Allow() would see half-open and refuse traffic
// forever. safeCall converts the panic to a poison error AND
// the defer guarantees Record() fires on every exit path,
// so the breaker is always resolved.
out, err := func() (result Record, outErr error) {
outErr = errPanic // pessimistic default for the defer
defer func() {
// Caller-initiated cancellation is NOT a dependency
// failure. If we count ctx.Canceled / DeadlineExceeded
// toward the breaker, a pipeline shutdown, drain, or
// upstream timeout will trip a healthy breaker — and
// the next retry (or the retries of the sibling
// workers) will see ErrBreakerOpen and mark records
// as poison that were really just shutdown casualties.
// Classify before recording.
if errors.Is(outErr, context.Canceled) || errors.Is(outErr, context.DeadlineExceeded) {
return
}
b.Record(outErr)
}()
result, outErr = safeCall(ctx, r, fn)
return
}()
if err == nil {
return out, nil
}
lastErr = err
if retryable != nil && !retryable(err) {
return r, err // permanent error — don't waste attempts or breaker budget
}
// Backoff: 2^attempt * 100ms, capped at 5s, with full jitter.
// math/rand/v2 is goroutine-safe and per-P seeded.
backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
if backoff > 5*time.Second {
backoff = 5 * time.Second
}
sleep := time.Duration(rand.Int64N(int64(backoff))) // full jitter, always >= 0
// time.After would leak the underlying timer until it fires,
// which is wasteful when ctx cancels early in a tight loop.
// A named timer with defer Stop() releases it immediately.
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return r, ctx.Err()
case <-timer.C:
}
}
return r, lastErr
}
}
Three details worth naming:
- Half-open admits exactly one probe. The earlier version decremented
failureson cooldown and let every concurrent caller through, which is a breaker stampede — every worker hammers the recovering dependency at the same time. A real breaker needs a distinct half-open state that holds all but one caller out until the probe resolves. retryablefilters permanent errors. Don’t retry a 400 Bad Request or a 401 Unauthorized. Wasted attempts burn your retry budget and, worse, a stream of 401s can get your client IP rate-limited or banned. Pass a predicate that returnstrueonly for transient classes (connection reset, 5xx, timeouts).- Full jitter, not “backoff plus jitter”. Full jitter (
rand.Int64N(backoff)) spreads retries uniformly across the window and de-synchronizes workers. “Backoff + jitter” leaves a floor that keeps retries clustered. The AWS Architecture Blog post on this is worth the 10 minutes.
Retry budget matters. maxAttempts=3 with a 3-worker pool can issue 9 concurrent retries against a struggling dependency. Cap your fan-out and your attempts together. I default to maxAttempts=3, Threshold=5, CooldownFor=30s for enrichment APIs and tune from there — which is exactly what NewBreaker hands you.
Caller cancellation is not a dependency failure. The WithRetry wrapper above explicitly ignores context.Canceled and context.DeadlineExceeded when recording into the breaker. Without that rule, a pipeline shutdown or a draining timeout will count against your breaker threshold, trip a healthy downstream, and the next retry will see ErrBreakerOpen and mark the record as poison. You end up with “failed” records in your dead-letter sink that were really just in-flight at shutdown. State this as a contract: breakers only count failures attributable to the dependency.
When to use this: any transformer calling a service you don’t own. Geocoding, enrichment, ML inference, external validators.
When not to use this: pure CPU transforms (parse a date, compute a hash, normalize a string). Retry buys nothing when there’s no transient failure class.
Validation at the Boundary, Typed
The original version of this pipeline took validation rules as interface{} values with ad-hoc type assertions inside each rule. That’s fragile — a typo in a rule config throws a runtime panic three hours into a batch. Validation belongs at the edge of trust, and it should be typed.
I keep validation thin: a rule is a function, rules compose, and a failing rule converts the record to poison rather than halting the stage.
package etl
import (
"fmt"
"reflect"
)
type Rule func(Record) error
// RequiredString checks that a map field is present and a non-empty string.
// Use this when the field's expected type is string. The untyped Required
// below accepts any type but leans on reflect for the "zero value" check —
// pick the one that matches your schema discipline.
func RequiredString(field string) Rule {
return func(r Record) error {
v, ok := r.Data[field]
if !ok || v == nil {
return fmt.Errorf("field %q required", field)
}
s, ok := v.(string)
if !ok {
return fmt.Errorf("field %q: expected string, got %T", field, v)
}
if s == "" {
return fmt.Errorf("field %q required", field)
}
return nil
}
}
// Required checks that a map field is present and not the Go zero value for
// its dynamic type. Handles string, numeric, slice, map, pointer, and
// interface kinds via reflect.IsZero. If you only care about strings, prefer
// RequiredString — the explicit assertion is cheaper and clearer.
func Required(field string) Rule {
return func(r Record) error {
v, ok := r.Data[field]
if !ok || v == nil {
return fmt.Errorf("field %q required", field)
}
if reflect.ValueOf(v).IsZero() {
return fmt.Errorf("field %q required", field)
}
return nil
}
}
func MaxLen(field string, max int) Rule {
return func(r Record) error {
s, ok := r.Data[field].(string)
if !ok {
return fmt.Errorf("field %q: expected string", field)
}
if len(s) > max {
return fmt.Errorf("field %q: len %d > %d", field, len(s), max)
}
return nil
}
}
func All(rules ...Rule) Rule {
return func(r Record) error {
for _, rule := range rules {
if err := rule(r); err != nil {
return err
}
}
return nil
}
}
Usage inside a transformer:
validate := All(
Required("email"),
Required("user_id"),
MaxLen("email", 255),
)
transform := ParallelMap(ctx, in, 4, 64, func(ctx context.Context, r Record) (Record, error) {
if err := validate(r); err != nil {
return r, err // becomes poison downstream
}
return r, nil
})
Two caveats before you paste this. First, Required uses reflect.ValueOf(v).IsZero() so it catches "", 0, nil maps and slices, and zero pointers — but that reflect call costs nanoseconds per field and will surprise you on custom types with non-obvious zero semantics. For schemas that are map-of-string, RequiredString is the honest choice: explicit type assertion, no reflection, no ambiguity. Second, every rule that reaches into r.Data[...] is doing an untyped lookup that you will get wrong in production. This is exactly why the next paragraph exists.
If you want schema-level rigor, promote Data map[string]any to a typed struct per pipeline and let Go’s type system do the work. map[string]any is the right shape for a generic framework; it’s the wrong shape for your specific pipeline. Bind to types at the first stage where you know the schema.
The Loader: Idempotency Is Non-Negotiable
If your loader isn’t idempotent, your pipeline isn’t replayable. And if it isn’t replayable, every failure becomes an incident. Idempotent loaders are the difference between “rerun the job” and “page the on-call, manually reconcile.”
Two tactics that cover most cases:
- Upsert by a stable key.
INSERT ... ON CONFLICT (key) DO UPDATEin Postgres,MERGEin most warehouses. The record’s natural identity is the dedup key. - Watermarks. Record the highest input timestamp/offset successfully loaded. On restart, resume from the watermark, not from zero.
Batch the writes. One INSERT per record is usually the single biggest performance hit in a pipeline. Buffer records in the loader, flush on a size or time trigger.
package etl
import (
"context"
"database/sql"
"fmt"
"time"
)
type BatchLoader struct {
DB *sql.DB
Stmt string // parameterized upsert
BatchSize int
FlushEvery time.Duration
}
func (l *BatchLoader) Load(ctx context.Context, in <-chan Record) <-chan Record {
out := make(chan Record, l.BatchSize)
go func() {
defer close(out)
batch := make([]Record, 0, l.BatchSize)
ticker := time.NewTicker(l.FlushEvery)
defer ticker.Stop()
flush := func() {
if len(batch) == 0 {
return
}
// write() tries the batch, then falls back to per-row so only
// the actually-bad rows come back with Err set. Emit each
// record downstream with whatever status it ended up in —
// successful rows flow as-is, failed rows become poison.
_ = l.write(ctx, batch)
for _, r := range batch {
sendOrDrop(ctx, out, r)
}
batch = batch[:0]
}
for {
select {
case <-ctx.Done():
return
case rec, ok := <-in:
if !ok {
flush()
return
}
if rec.Err != nil {
// Route poison records straight to dead letter.
sendOrDrop(ctx, out, rec)
continue
}
// Type-validate BEFORE the record enters a batch. One bad
// record silently typed as any{} poisons the whole INSERT
// if we only find out at ExecContext time.
if err := validateRow(rec); err != nil {
rec.Err = err
sendOrDrop(ctx, out, rec)
continue
}
batch = append(batch, rec)
if len(batch) >= l.BatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}()
return out
}
// validateRow enforces the driver-facing type contract. Never pass
// r.Data[k] straight to ExecContext — map[string]any values arrive from
// upstream transforms with whatever dynamic type the extractor produced,
// and a single int-where-string-was-expected turns the whole batch into
// a rollback. Validate at batch-append time so the bad record is isolated.
func validateRow(r Record) error {
id, ok := r.Data["id"].(string)
if !ok || id == "" {
return fmt.Errorf("field %q: expected non-empty string, got %T", "id", r.Data["id"])
}
email, ok := r.Data["email"].(string)
if !ok || email == "" {
return fmt.Errorf("field %q: expected non-empty string, got %T", "email", r.Data["email"])
}
return nil
}
func (l *BatchLoader) write(ctx context.Context, batch []Record) error {
if err := l.writeBatch(ctx, batch); err == nil {
return nil
}
// Batch failed. The cause is either a per-row error (constraint
// violation, malformed value the pre-validator didn't catch) or a
// transient infra issue that will fail the row-by-row path too.
// Either way, isolate: write each row in its own tiny transaction so
// one poison record doesn't take down 999 valid siblings.
return l.writePerRow(ctx, batch)
}
func (l *BatchLoader) writeBatch(ctx context.Context, batch []Record) error {
tx, err := l.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() // no-op after commit
stmt, err := tx.PrepareContext(ctx, l.Stmt)
if err != nil {
return err
}
defer stmt.Close()
for _, r := range batch {
if _, err := stmt.ExecContext(ctx, r.Data["id"], r.Data["email"]); err != nil {
return err
}
}
return tx.Commit()
}
// writePerRow retries a failed batch as individual statements so one bad
// record is quarantined instead of rolling back every sibling. Mutates
// batch[i].Err for rows that fail; returns the last error seen so the
// caller can still surface a retryable classification.
func (l *BatchLoader) writePerRow(ctx context.Context, batch []Record) error {
stmt, err := l.DB.PrepareContext(ctx, l.Stmt)
if err != nil {
return err
}
defer stmt.Close()
var lastErr error
for i := range batch {
r := batch[i]
if _, err := stmt.ExecContext(ctx, r.Data["id"], r.Data["email"]); err != nil {
batch[i].Err = err
lastErr = err
}
}
return lastErr
}
One transaction per batch, prepared statement reused, deferred rollback for the fail path. That’s the loader primitive. Scale it up by running multiple loader goroutines in parallel if your target supports concurrent writes — databases with contention don’t, warehouses usually do.
Two things this code does that deserve a second look. First, type-validation lives at batch-append time, not inside the transaction — r.Data["id"] and r.Data["email"] are validated to be non-empty strings before the record ever reaches the driver. A single int-where-string-was-expected would otherwise roll back the entire batch from inside ExecContext. Second, when a batched transaction fails, the code falls back to row-by-row execution so the actually-bad record is isolated instead of poisoning 999 valid siblings. That’s the opposite of “any error → poison the whole batch,” which is how you silently lose data during an incident. For transient infra errors (connection reset mid-batch) the row-by-row path will also fail — the correct response is still to wrap write in the same retry-with-breaker described earlier, and only mark as poison once the retry budget is exhausted. Name the policy explicitly for your pipeline.
Observability: Per-Stage Counters, Not Debug Prints
The single most useful observability you can add to an ETL pipeline is a counter on every stage for records in, records out, records errored. Four numbers per stage. Export them to Prometheus or stdout. The moment records-out diverges from records-in somewhere, you know exactly which stage to look at.
Wrap each stage with a metered middleware:
package etl
import (
"context"
"sync/atomic"
)
type Metrics struct {
In, Out, Errors atomic.Int64
}
func Metered(name string, m *Metrics, stage Stage) Stage {
return func(ctx context.Context, in <-chan Record) <-chan Record {
counted := make(chan Record)
go func() {
defer close(counted)
for r := range in {
// Increment only when the record actually forwards.
// Counting before the select would inflate In on
// ctx-cancel-mid-handoff — the metric would claim we
// processed records we never shipped downstream.
select {
case <-ctx.Done():
return
case counted <- r:
m.In.Add(1)
}
}
}()
out := stage(ctx, counted)
final := make(chan Record)
go func() {
defer close(final)
for r := range out {
if r.Err != nil {
m.Errors.Add(1)
} else {
m.Out.Add(1)
}
select {
case <-ctx.Done():
return
case final <- r:
}
}
}()
return final
}
}
Log these counters every 10 seconds to a structured logger with the stage name attached. When something goes wrong, the diff in the numbers tells you the story before you open a trace viewer.
Tracing is a layer up — OpenTelemetry spans per stage are useful once your pipeline has more than three stages or crosses process boundaries. Start with counters. Add tracing when you need to understand per-record latency distribution.
Context Cancellation: Drain vs Drop
On shutdown, you have a choice: drain in-flight records (let the pipeline finish what it has) or drop them (cancel and move on). Both are legitimate. Pick per pipeline.
- Drain: stop feeding the extractor, let the rest of the pipeline flush. Graceful, needed for loaders with transactional commits. Requires a separate shutdown signal distinct from
ctx.Cancel. - Drop:
ctx.Cancel()propagates through everyselect, stages exit immediately, in-flight records are lost. Appropriate for idempotent pipelines that will be replayed.
Don’t try to do both in the same pipeline. Name the policy, document it, make it the contract.
The drain pattern looks like two contexts: a runCtx for the pipeline itself, and a shutdownCtx that the extractor watches to know when to stop emitting. When shutdownCtx cancels, the extractor closes its output channel, and the pipeline naturally drains to completion.
What I’d Actually Choose
If I’m starting a new ETL pipeline in Go today:
Architecture: channel-based streaming stages with a single Stage signature. Fixed worker pools per stage, sized deliberately. Bounded channels for backpressure. No generic pipeline framework — compose stages directly. Framework abstractions get in the way more than they help.
Concurrency: golang.org/x/sync/errgroup with SetLimit for bounded fan-out. context.Context threaded through everything. Every send and receive in a select with ctx.Done(). No exceptions.
Extraction: stream with json.Decoder and csv.Reader, never io.ReadAll. Paginate APIs. Use native client libraries for message queues (Kafka, Pub/Sub) — don’t reinvent.
Reliability: circuit breaker around every external dependency, retry with exponential backoff + jitter inside the breaker. Max 3 attempts. Poison records flow to a dead-letter sink, they don’t crash the stage.
Loaders: idempotent by construction. Upsert by stable key, or watermark-based resumption. Batch writes, one transaction per batch, prepared statements. Never one INSERT per record.
Observability: atomic counters per stage for in/out/errors, logged every 10s. Add OpenTelemetry tracing once you have more than three stages. Never log the raw record body — structured fields with redaction at the sink.
The biggest mistake I see teams make: building a “pipeline framework” before they’ve built three pipelines. Write the ugly, concrete version first. Pull out the shape only after you’ve seen the same shape three times. The framework that emerges from three real pipelines is worth more than the one you designed up front, because it defends against failures you’ve actually lived through, not the ones you imagined.