A Go CLI for end-to-end loading of JSON documents into Elasticsearch-compatible clusters.
es-bulk-loader can manage index creation, settings, mappings, ingest pipelines, enrich policies, transforms,
document loading, and optional enrich execution as one repeatable CLI or CI workflow.
- Full index lifecycle support: create, replace data, rebuild, and destructive cleanup
- Managed Elasticsearch resources: settings, mappings, ingest pipelines, enrich policies, and transforms
- Keyed JSON definitions for multiple pipelines or policies in a single file
- Automatic default-pipeline selection from the first declared pipeline when settings leave it unset
- Bulk JSON array or NDJSON loading with configurable batch sizes and optional
_idoverride - Enrich execution after load, including source-index refresh before policy execution
- Transform upsert/start lifecycle with source-index filtering from config
- Mapping preflight guard that validates declared root field types and
date_detectionbefore bulk inserts - Practical input normalization for wrapped settings, nested
indexsettings, andindex.*keys - Safe-by-default policy deletion, with an explicit destructive override for dependent pipeline cleanup
- Graceful handling for clusters that do not expose enrich APIs
go build -o es-bulk-loader ./cmd/es-bulk-loaderdocker run --rm jnovack/es-bulk-loader:latest \
-url https://localhost:9200 \
-insecureSkipVerify=true \
-index my-index \
-settings settings.json \
-mappings mappings.json \
-pipelines pipelines.json \
-policies policies.json \
-data data.json \
-batch 500 \
-delete \
-sync-managed \
-enrichgo run ./cmd/es-bulk-loader \
-url https://localhost:9200 \
-insecureSkipVerify=true \
-index my-index \
-settings settings.json \
-mappings mappings.json \
-pipelines pipelines.json \
-policies policies.json \
-data data.json \
-batch 500 \
-delete \
-sync-managed \
-enrichSample config file: examples/es-bulk-loader.conf
es-bulk-loader can now be called directly in-process from Go via pkg/loader.
Full lifecycle run:
package main
import (
"context"
"log"
loaderpkg "github.com/jnovack/es-bulk-loader/pkg/loader"
)
func main() {
result, err := loaderpkg.Run(context.Background(), loaderpkg.Options{
URL: "http://localhost:9200",
Index: "slugs",
SettingsFile: "./examples/slugs/settings.json",
MappingsFile: "./examples/slugs/mappings.json",
PipelinesFile: "./examples/slugs/pipelines.json",
PoliciesFile: "./examples/slugs/policies.json",
DataFile: "./examples/slugs/slugs.json",
DeleteIndex: true,
SyncManaged: true,
Enrich: loaderpkg.EnrichOptions{
Enabled: true,
All: true,
},
})
if err != nil {
log.Fatal(err)
}
log.Printf("index=%s processed=%d succeeded=%d failed=%d warnings=%d",
result.WriteIndex,
result.DocumentsProcessed,
result.DocumentsSucceeded,
result.DocumentsFailed,
len(result.Warnings),
)
}Targeted helpers are also available for orchestration:
loader.SyncManaged(ctx, opts)loader.LoadData(ctx, opts)loader.ExecuteEnrich(ctx, opts)
Sentinel error classes are exposed for classification:
loader.ErrInvalidOptionsloader.ErrIndexOperationloader.ErrManagedResourceloader.ErrBulkFailureloader.ErrEnrichExecutionloader.ErrLoaderExecution
Unit tests stay in the default Go test path:
go test ./...Canonical E2E command (also available as make test-e2e) uses the tagged suite entrypoint:
go test -tags=e2e ./test -run TestEndToEndScenariosVerbose E2E output:
go test -tags=e2e ./test -run TestEndToEndScenarios -vIf you see build constraints exclude all Go files for ./test, the run is missing -tags=e2e.
The E2E package is intentionally build-tagged.
Makefile shortcuts:
make test-unit
make test-e2e
make test-e2e-verboseThe loader now separates three concerns:
-nukeremoves the current index and declared managed resources first.- One optional data action, exactly one of
-add,-flush, or-delete, controls how documents are handled. -sync-managedindependently creates or updates declared pipelines, policies, and transforms.
When combined, the execution order is:
- Run
-nukefirst, if requested. - Run the selected data action, if any (one of
-add,-flush, or-delete). - Create required ingest pipelines for index creation when
index.default_pipelineis configured. - Create the index when needed, applying settings and mappings.
- Bulk load data, if a data action was selected.
- Update alias pointers, when alias mode creates a new concrete index.
- Create or update enrich policies during
-sync-managedonce the source index/alias is ready. - Refresh the source index and execute enrich policies when
-enrichis requested. - Create or update selected transforms during
-sync-managed. - Start selected transforms after enrich execution (or immediately after transform upsert when enrich is disabled).
That ordering matters. The loader is intentionally opinionated so CI runs and operator workflows stay predictable.
When -mappings is provided, the loader derives a pre-bulk validation plan
from the declared mapping and validates effective index mappings before loading
documents:
- Optional
mappings.date_detectionis checked when explicitly declared. - Declared top-level field
typemappings are verified against effective index mappings.
This fails early when effective mappings drift from declared mapping intent.
Settings can be loaded from a configuration file (e.g. -config es-bulk-loader.conf), the environment,
or from the command-line.
| Flag | Description |
|---|---|
-config |
Path to configuration file with settings |
-url |
Endpoint URL (e.g., http://localhost:9200) |
-insecureSkipVerify |
Skip TLS verification for HTTPS |
-index |
Target index name (required) |
-alias |
Treat -index as an alias and create timestamped indices as <alias>-YYYYMMDDHHMMSS when creating a new index |
-keep-last |
With -alias, keep only the newest N timestamped indices matching <alias>-YYYYMMDDHHMMSS (default: 0, disabled) |
-settings |
Optional path to JSON file with index settings |
-mappings |
Optional path to JSON file with index mappings |
-pipelines |
Optional path to JSON file with one or more ingest pipeline definitions |
-policies |
Optional path to JSON file with one or more enrich policy definitions |
-transforms |
Optional path to JSON file with one or more transform definitions |
-batch |
Number of documents per bulk insert (default: 1000) |
-bulk-retry-attempts |
Total bulk API request attempts for retryable failures, including the first attempt (default: 4) |
-bulk-retry-backoff-base |
Base backoff duration for retryable bulk failures (default: 500ms) |
-bulk-retry-backoff-max |
Maximum backoff duration for retryable bulk failures (default: 5s) |
-add |
Append data to an existing index or create the index first if it does not exist |
-flush |
Delete all documents from an existing index without deleting the index, then load replacement data |
-delete |
Recreate data target before loading data: deletes concrete index in normal mode; rolls alias to a new timestamped index in -alias mode |
-data |
Path to a JSON array or NDJSON document file to load (required with -add, -flush, or -delete) |
-sync-managed |
Create or update declared ingest pipelines, enrich policies, and transforms |
-nuke |
Delete the current index and declared managed resources first, including dependent pipelines that reference declared enrich policies |
-id |
Field to use in the document to override _id (default: not set) |
-enrich |
Run enrich policies after the bulk insert; omit value for all or pass a comma-separated list |
-user / -pass |
Username and password for Basic Auth |
-apiKey |
Elasticsearch API key |
-level |
Log level filter: trace, debug, info, warn, or error (default: info) |
-version |
Print version and exit |
-add, -flush, and -delete are mutually exclusive.
| Flags | Effect |
|---|---|
-add |
Append data to an existing index, or create the index and load data if it does not exist |
-flush |
Remove existing documents, keep the existing index structure, then load replacement data |
-delete |
Recreate data target and reload data: concrete index is deleted/recreated in normal mode; alias mode creates a new timestamped index and repoints alias |
-sync-managed |
Create or update declared pipelines, policies, and transforms without changing document data by itself |
-nuke |
Remove the current index and declared managed resources first; if combined with another action, that action runs afterward |
-alias |
Treat -index as an alias and use timestamped concrete index names (<alias>-YYYYMMDDHHMMSS) when a new index is created |
-keep-last |
With -alias, prune older timestamped concrete indices after the run and keep only the newest N by parsed timestamp suffix |
Common combinations:
-add -sync-managed: append documents and ensure declared pipelines, policies, and transforms exist-flush -sync-managed: replace documents, keep settings and mappings, and ensure declared pipelines, policies, and transforms exist-delete -sync-managed: rebuild the index from scratch and recreate declared pipelines, policies, and transforms-nuke -delete -sync-managed: force a full teardown first, then rebuild cleanly-nuke: remove the current index and declared managed resources without loading new data-delete -alias -keep-last 2: roll to a new timestamped index, repoint alias, then keep only the newest two generations
Two formats are accepted. Format is detected automatically from the first non-whitespace byte; no flag is required.
JSON array (original format):
[
{ "id": "1", "name": "Alice" },
{ "id": "2", "name": "Bob" }
]NDJSON (newline-delimited JSON, one object per line):
{"id":"1","name":"Alice"}
{"id":"2","name":"Bob"}
NDJSON is preferred for large datasets. It enables streaming decode with O(1) memory per document and is the canonical format for ES bulk-API interchange.
When -alias is set, -index is interpreted as an Elasticsearch alias name.
The loader uses Elasticsearch terms in logs:
alias: logical name provided by-index(for examplecards)index: concrete index name (for examplecards-20260319130459)
Concrete index names created by alias mode always use a numeric suffix:
<alias>-YYYYMMDDHHMMSS
Data-action behavior in alias mode:
-delete: create a new timestamped index, load data, then point the alias to the new index (older timestamped indices can be pruned with-keep-last)-flush: flush documents from current alias target indices, then load replacement data-add: append to current alias target index; if the alias has no index yet, create a first timestamped index and load data
-keep-last only applies when -alias is enabled. It parses and sorts timestamped concrete index names, then deletes older ones so only the newest N remain.
In alias mode, -delete keeps existing managed resources (pipelines/policies/transforms) and relies on -sync-managed to upsert definitions; use -nuke when you want destructive managed-resource cleanup.
If -delete -alias is run without -sync-managed, the loader logs a warning and assumes -sync-managed for that run. Add -sync-managed explicitly in automation for clarity.
If -delete -alias is run without -keep-last, the loader logs a warning that no old generations were deleted and storage use can grow over time.
Example progression with -keep-last 2:
- Run 1: keep
cards-20260319130000 - Run 2: keep
cards-20260319130000,cards-20260319130500 - Run 3: create
cards-20260319131000, then prune oldest so remaining arecards-20260319130500,cards-20260319131000
Use -enrich after a bulk load when enrich policy backing indices need to be rebuilt.
When -pipelines and -policies are supplied, the loader imports those definitions as part of the run:
-sync-managedcreates pipelines, attempts to create declared enrich policies, and upserts matching transforms.- Declared enrich policies are managed by content hash during
-sync-managed: each logical policy key is resolved to<logical>-<sha256[:6]>. - If a policy definition is unchanged, the same managed policy name is reused.
- If a policy definition changes, a new managed policy name is created and pipelines are rewritten to reference it.
- Old managed policy versions are garbage-collected when they are unreferenced by any pipeline.
-deletefails loudly if a declared enrich policy is still referenced by another ingest pipeline. This is the safe default because those references may belong to another index workflow.-nukekeeps deleting through that conflict by finding and deleting ingest pipelines that reference the declared enrich policy. If one of those pipelines is still configured asindex.default_pipelineon another index, the loader clears that setting on the dependent index before retrying the pipeline delete.-addand-flushaffect documents only. They do not modify pipelines, policies, or transforms unless-sync-managedis also set.
Caution
Use -nuke carefully. Unlike the normal one-index workflow, it can remove ingest pipelines outside the current index's declared pipeline file and clear index.default_pipeline on dependent indices if those pipelines reference a declared enrich policy you are deleting.
go run cmd/es-bulk-loader/main.go \
-url https://localhost:9200 \
-index my-index \
-sync-managed \
-data data.json \
-enrichWhen -enrich is provided without an explicit policy list and -policies is also provided, the loader executes the resolved managed policy names declared for that run. If no policy file is provided, it falls back to all enrich policies currently available in the cluster.
Run only specific policies:
go run cmd/es-bulk-loader/main.go \
-url https://localhost:9200 \
-index my-index \
-sync-managed \
-data data.json \
-enrich=policy-a,policy-bUnknown policy names are logged as warnings and skipped. When a policy file is provided, explicit logical policy names are mapped to the resolved managed policy names automatically. If the cluster does not expose the enrich APIs at all, the loader logs a warning and skips enrich policy create/delete/execute operations instead of aborting the whole load.
When -transforms is provided, definitions are read from a keyed JSON object:
- JSON key: transform ID.
source_index: logical source index name that must match the current-index.body: native Elasticsearch transform definition passed directly to the transform APIs.
Only transforms whose source_index matches the current -index are selected for the run.
During -sync-managed, selected transforms are stopped, upserted after data load and alias updates, and then started after enrich execution (or immediately after upsert when enrich is disabled).
During -nuke, selected transforms are force-deleted.
Definition file formats are documented in docs/PIPELINES.md and docs/POLICIES.md.
When -pipelines is supplied during index creation, the loader preserves the JSON key order from the pipeline file. If the settings file does not already define a default pipeline, the first declared pipeline becomes the index default pipeline automatically.
Definition files support variable expansion before they are parsed. ${INDEX} is populated from the current -index value (alias name when -alias is enabled), and other placeholders fall back to environment variables when present.
[
{ "id": 1, "name": "Alice" },
{ "id": 2, "name": "Bob" }
]{
"number_of_shards": 1,
"number_of_replicas": 1
}Wrapped settings are also accepted:
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
}
}Nested index settings are also accepted and normalized:
{
"settings": {
"index": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
}Dotted index.* keys are also accepted:
{
"index.number_of_shards": 2,
"index.number_of_replicas": 1,
"index.default_pipeline": "my-default-pipeline"
}All of these shapes are normalized into the create-index request body the loader sends to Elasticsearch.
{
"properties": {
"id": { "type": "integer" },
"name": { "type": "text" }
}
}Wrapped mappings are also accepted:
{
"mappings": {
"properties": {
"id": { "type": "integer" },
"name": { "type": "text" }
}
}
}- Go 1.25+
- Elasticsearch 7.x, 8.x, or 9.x (tested with v9 client) or Opensearch (does not support enrich policies)
MIT License © 2025-2026
This project is an independent, open-source tool and is not affiliated with, endorsed by, or sponsored by Elastic. “Elasticsearch” and “Elastic” are trademarks of Elasticsearch B.V.