Der EFK-Stack kombiniert Elasticsearch, Fluentd und Kibana zu einer integrierten Logging-Plattform für OpenShift-Umgebungen und implementiert End-to-End-Log-Management-Workflows von Sammlung über Verarbeitung bis Visualisierung. Diese Open-Source-Logging-Lösung bietet Enterprise-Grade-Logging-Funktionen mit skalierbarer Architektur und flexibler Konfiguration für umfassende Container-Plattform-Observability.
Die EFK-Stack-Architektur implementiert Pipeline-basierte Log-Verarbeitung mit Fluentd als Sammelebene, Elasticsearch als Speicher- und Index-Ebene und Kibana als Visualisierungs- und Analyse-Ebene. Diese modulare Architektur ermöglicht unabhängige Komponenten-Skalierung und flexible Deployment-Topologien.
[Diagramm: EFK-Stack-Architektur mit Datenfluss zwischen Fluentd, Elasticsearch und Kibana]
Die Interaktionsmuster zwischen EFK-Komponenten definieren Datenfluss mit konfigurierbarer Pufferung und Fehlerbehandlungsmechanismen. Diese Inter-Komponenten-Kommunikation gewährleistet zuverlässige Log-Verarbeitung auch bei Komponenten-Ausfällen oder Netzwerkproblemen.
Datenfluss-Übersicht: 1. Fluentd sammelt Logs von verschiedenen Quellen (Pods, Nodes, System-Services) 2. Transformation und Anreicherung der Log-Daten in Fluentd-Pipelines 3. Elasticsearch indexiert und speichert die verarbeiteten Logs 4. Kibana stellt Such- und Visualisierungsschnittstellen bereit
Operator-basiertes Deployment automatisiert EFK-Stack-Installation und -Management durch Kubernetes-Operatoren:
# OpenShift Logging Operator
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
name: cluster-logging
namespace: openshift-logging
spec:
channel: stable
name: cluster-logging
source: redhat-operators
sourceNamespace: openshift-marketplace
---
# ClusterLogging Custom Resource
apiVersion: logging.coreos.com/v1
kind: ClusterLogging
metadata:
name: instance
namespace: openshift-logging
spec:
managementState: Managed
logStore:
type: elasticsearch
retentionPolicy:
application:
maxAge: 7d
infra:
maxAge: 7d
audit:
maxAge: 7d
elasticsearch:
nodeCount: 3
storage:
storageClassName: fast-ssd
size: 100G
resources:
requests:
memory: 4Gi
cpu: 1
limits:
memory: 4Gi
cpu: 2
visualization:
type: kibana
kibana:
replicas: 1
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 1Gi
cpu: 1
collection:
logs:
type: fluentd
fluentd:
resources:
requests:
memory: 1Gi
cpu: 200m
limits:
memory: 1Gi
cpu: 500mElasticsearch-Cluster-Architektur implementiert eine verteilte Such- und Analyse-Engine mit Node-Rollen-Spezialisierung für Master-, Data- und Koordinations-Funktionen.
# Elasticsearch Custom Resource
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: logging-cluster
namespace: elastic-system
spec:
version: 8.5.0
nodeSets:
# Master Nodes
- name: masters
count: 3
config:
node.roles: ["master"]
xpack.security.enabled: true
cluster.initial_master_nodes:
- logging-cluster-es-masters-0
- logging-cluster-es-masters-1
- logging-cluster-es-masters-2
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
requests:
memory: 2Gi
cpu: 1
limits:
memory: 2Gi
cpu: 2
env:
- name: ES_JAVA_OPTS
value: "-Xms1g -Xmx1g"
# Data Nodes
- name: data
count: 3
config:
node.roles: ["data", "ingest"]
xpack.security.enabled: true
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
requests:
memory: 8Gi
cpu: 2
limits:
memory: 8Gi
cpu: 4
env:
- name: ES_JAVA_OPTS
value: "-Xms4g -Xmx4g"
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 500Gi
storageClassName: fast-ssdIndex-Management implementiert zeitbasierte Index-Erstellung mit automatischem Index-Rollover:
# Index Template für Application Logs
PUT _index_template/app-logs-template
{
"index_patterns": ["app-logs-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.refresh_interval": "30s",
"index.codec": "best_compression",
"index.query.default_field": [
"message", "kubernetes.namespace", "kubernetes.pod"
]
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"kubernetes": {
"properties": {
"namespace": {"type": "keyword"},
"pod": {"type": "keyword"},
"container": {"type": "keyword"},
"node": {"type": "keyword"}
}
}
}
}
}
}
# ILM Policy für Log-Retention
PUT _ilm/policy/app-logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "5GB",
"max_age": "1d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "2d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"allocate": {
"number_of_replicas": 0
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"allocate": {
"number_of_replicas": 0
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "30d"
}
}
}
}| Node-Typ | Shards pro Node | Heap-Größe | Verwendung |
|---|---|---|---|
| Master | 0 (keine Data-Shards) | 1-2 GB | Cluster-Management |
| Data | 10-25 | 4-8 GB | Log-Speicherung und -Suche |
| Ingest | 0-5 | 2-4 GB | Log-Verarbeitung |
| Coordinator | 0 | 1-2 GB | Query-Koordination |
Fluentd fungiert als zentrale Log-Sammlung und -Verarbeitungsschicht im EFK-Stack.
# Fluentd Konfiguration für Kubernetes-Logs
<source>
@type tail
@id container_logs
path "/var/log/containers/*.log"
pos_file "/var/log/fluentd-containers.log.pos"
tag kubernetes.*
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>
# System Journal Logs
<source>
@type systemd
@id systemd-input
tag systemd
path /var/log/journal
<storage>
@type local
persistent true
path /var/log/fluentd-systemd.pos
</storage>
<entry>
fields_strip_underscores true
field_map {
"MESSAGE": "message",
"_SYSTEMD_UNIT": "systemd_unit",
"_PID": "systemd_pid"
}
</entry>
</source># Kubernetes Metadata Filter
<filter kubernetes.**>
@type kubernetes_metadata
@id kubernetes_metadata
kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
preserve_json_log true
merge_json_log true
de_dot false
annotation_match [ ".*" ]
</filter>
# Log Enhancement Filter
<filter kubernetes.**>
@type record_transformer
@id kubernetes_record_transformer
enable_ruby true
<record>
cluster_name "#{ENV['CLUSTER_NAME'] || 'openshift-cluster'}"
log_level ${record.dig("kubernetes", "labels", "log-level") || "info"}
app_name ${record.dig("kubernetes", "labels", "app") || "unknown"}
environment ${record.dig("kubernetes", "namespace_labels", "environment") || "unknown"}
</record>
remove_keys kubernetes.pod_id
</filter>
# Multi-line Processing für Java Stack Traces
<filter kubernetes.**>
@type concat
key log
stream_identity_key stream
multiline_start_regexp /^\d{4}-\d{2}-\d{2}/
separator ""
timeout_label @NORMAL
</filter>
# PII Scrubbing Filter
<filter **>
@type record_modifier
<record>
message ${record["message"]&.gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, "****-****-****-****")}
message ${record["message"]&.gsub(/\b[\w\.-]+@[\w\.-]+\.\w+\b/, "***@***.***")}
</record>
</filter># Elasticsearch Output
<match **>
@type elasticsearch
@id elasticsearch_output
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme https
ssl_verify true
user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
logstash_format true
logstash_prefix app-logs
logstash_dateformat %Y.%m.%d
include_tag_key true
tag_key fluentd_tag
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>
# Error Handling
<label @ERROR>
<match **>
@type file
@id error_output
path /var/log/fluentd-error.log
append true
<buffer>
flush_mode immediate
</buffer>
</match>
</label>| Buffer-Typ | Verwendung | Vorteile | Nachteile |
|---|---|---|---|
| Memory | Hoher Durchsatz | Schnell, niedrige Latenz | Datenverlust bei Crash |
| File | Zuverlässigkeit | Persistenz, kein Datenverlust | Langsamere I/O |
| Hybrid | Ausgewogenheit | Geschwindigkeit + Sicherheit | Komplexere Konfiguration |
Kibana stellt die Benutzeroberfläche für Log-Suche, -Analyse und -Visualisierung bereit.
# Index Pattern über Kibana API erstellen
curl -X POST "kibana:5601/api/saved_objects/index-pattern/app-logs-*" \
-H "Content-Type: application/json" \
-H "kbn-xsrf: true" \
-d '{
"attributes": {
"title": "app-logs-*",
"timeFieldName": "@timestamp",
"fields": "[{\"name\":\"@timestamp\",\"type\":\"date\",\"searchable\":true,\"aggregatable\":true},{\"name\":\"level\",\"type\":\"string\",\"searchable\":true,\"aggregatable\":true},{\"name\":\"message\",\"type\":\"string\",\"searchable\":true,\"aggregatable\":false}]"
}
}'# Kibana Query DSL für Log-Suche
# Fehler-Logs der letzten 24 Stunden
{
"query": {
"bool": {
"must": [
{
"match": {
"level": "ERROR"
}
},
{
"range": {
"@timestamp": {
"gte": "now-24h"
}
}
}
],
"filter": [
{
"term": {
"kubernetes.namespace.keyword": "production"
}
}
]
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}# Kibana Visualization für Log-Level-Distribution
{
"version": "8.5.0",
"objects": [
{
"id": "log-level-pie",
"type": "visualization",
"attributes": {
"title": "Log Level Distribution",
"visState": {
"type": "pie",
"params": {
"addTooltip": true,
"addLegend": true,
"legendPosition": "right"
},
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric",
"params": {}
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "level.keyword",
"size": 10,
"order": "desc",
"orderBy": "1"
}
}
]
},
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"index": "app-logs-*",
"query": {
"match_all": {}
},
"filter": []
}
}
}
}
]
}# Complete Dashboard für Application Monitoring
{
"version": "8.5.0",
"objects": [
{
"id": "app-monitoring-dashboard",
"type": "dashboard",
"attributes": {
"title": "Application Monitoring Dashboard",
"panelsJSON": [
{
"gridData": {"x": 0, "y": 0, "w": 24, "h": 15},
"panelIndex": "1",
"version": "8.5.0",
"panelRefName": "panel_1"
},
{
"gridData": {"x": 24, "y": 0, "w": 24, "h": 15},
"panelIndex": "2",
"version": "8.5.0",
"panelRefName": "panel_2"
}
],
"timeRestore": true,
"timeTo": "now",
"timeFrom": "now-24h",
"refreshInterval": {
"pause": false,
"value": 30000
}
}
}
]
}Erweiterte Log-Verarbeitung verbessert die Suchbarkeit und Analysefähigkeiten von Log-Daten.
# Fluentd Normalisierung verschiedener Anwendungen
<filter kubernetes.**>
@type record_transformer
enable_ruby true
<record>
# Standardisiere Log-Level
normalized_level ${
case record["level"]&.downcase
when "warn", "warning" then "WARNING"
when "err", "error", "fatal" then "ERROR"
when "debug", "trace" then "DEBUG"
when "info", "information" then "INFO"
else (record["level"]&.upcase || "UNKNOWN")
end
}
# Extrahiere HTTP Status Code
http_status ${
if record["message"] =~ /HTTP.*?(\d{3})/
$1.to_i
else
nil
end
}
# Parse Response Time
response_time_ms ${
if record["message"] =~ /(\d+(?:\.\d+)?)ms/
$1.to_f
else
nil
end
}
</record>
</filter># GeoIP Enrichment für IP-Adressen
<filter **>
@type geoip
# GeoIP Database Path
geoip_database "/etc/fluentd/GeoLite2-City.mmdb"
# Extract IP from message
<record>
geoip_city ${geoip_city['client_ip']}
geoip_country ${geoip_country_name['client_ip']}
geoip_location ${geoip_location['client_ip']}
</record>
remove_keys client_ip
</filter># User Agent Parser für HTTP-Logs
<filter nginx.**>
@type parser
key_name message
reserve_data true
<parse>
@type regexp
expression /^(?<remote_addr>[^ ]*) .* \[(?<time_local>[^\]]*)\] "(?<method>[^ ]*) (?<request>[^ ]*) (?<protocol>[^"]*)" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)"$/
time_format %d/%b/%Y:%H:%M:%S %z
</parse>
</filter>
<filter nginx.**>
@type ua_parser
key_name http_user_agent
out_key ua
flatten true
</filter>Für große EFK-Deployments sind spezielle Performance-Optimierungen erforderlich.
# Optimierte Elasticsearch-Konfiguration
elasticsearch.yml: |
cluster.name: logging-cluster
# JVM Heap Sizing (50% of available RAM, max 32GB)
# Set via ES_JAVA_OPTS: -Xms4g -Xmx4g
# Thread Pool Settings
thread_pool:
write:
size: 4
queue_size: 1000
search:
size: 6
queue_size: 1000
# Index Settings
indices:
memory:
index_buffer_size: 20%
fielddata:
cache:
size: 15%
# Network Settings
network:
host: 0.0.0.0
tcp:
keep_alive: true
reuse_address: true
# Discovery Settings
discovery:
seed_hosts: ["master-0", "master-1", "master-2"]
zen:
minimum_master_nodes: 2
# Performance Settings
bootstrap:
memory_lock: true
indices:
queries:
cache:
size: 15%# Optimierte Fluentd Worker-Konfiguration
<system>
workers 4
root_dir /var/log/fluentd
log_level info
suppress_repeated_stacktrace true
</system>
# Optimized Buffer Settings
<match **>
@type elasticsearch
<buffer>
@type file
path /var/log/fluentd-buffers/elasticsearch
# Flush Settings
flush_mode interval
flush_interval 5s
flush_thread_count 4
# Chunk Settings
chunk_limit_size 8MB
chunk_limit_records 10000
# Queue Settings
queue_limit_length 64
overflow_action drop_oldest_chunk
# Retry Settings
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 60s
retry_timeout 1h
retry_forever true
</buffer>
</match># Elasticsearch Index-Optimierung
PUT /app-logs-*/_settings
{
"index": {
"refresh_interval": "30s",
"number_of_replicas": 0,
"translog": {
"flush_threshold_size": "1gb",
"sync_interval": "30s"
},
"merge": {
"policy": {
"max_merged_segment": "5gb"
}
}
}
}
# Force Merge für ältere Indizes
POST /app-logs-2024.01.*/_forcemerge?max_num_segments=1&wait_for_completion=falseEFK-Stack-Sicherheit umfasst Authentifizierung, Autorisierung und Verschlüsselung.
# Elasticsearch Security Settings
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: elastic-certificates.p12
# Built-in Users
PUT /_security/user/kibana_system/_password
{
"password": "kibana_system_password"
}
PUT /_security/user/elastic/_password
{
"password": "elastic_password"
}# Elasticsearch Role für Namespace-spezifischen Zugriff
PUT /_security/role/logs_reader_production
{
"indices": [
{
"names": ["app-logs-*"],
"privileges": ["read"],
"query": {
"term": {
"kubernetes.namespace.keyword": "production"
}
}
}
]
}
# User mit eingeschränkten Berechtigungen
PUT /_security/user/prod_developer
{
"password": "secure_password",
"roles": ["logs_reader_production"],
"full_name": "Production Developer",
"email": "prod.dev@company.com"
}# Sensitive Data Masking in Fluentd
<filter **>
@type record_modifier
<replace>
key message
expression /password[=:]\s*\S+/i
replace password=***MASKED***
</replace>
<replace>
key message
expression /token[=:]\s*[\w\-\.]+/i
replace token=***MASKED***
</replace>
<replace>
key message
expression /\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/
replace ****-****-****-****
</replace>
</filter>Der EFK-Stack unterstützt erweiterte Alerting-Funktionen basierend auf Log-Inhalten.
# Watcher für hohe Error-Rate
PUT _watcher/watch/high_error_rate
{
"trigger": {
"schedule": {
"interval": "2m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": ["app-logs-*"],
"body": {
"query": {
"bool": {
"must": [
{
"match": {
"level": "ERROR"
}
},
{
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
}
]
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gte": 50
}
}
},
"actions": {
"send_email": {
"email": {
"to": ["ops-team@company.com"],
"subject": "High Error Rate Alert",
"body": "{{ctx.payload.hits.total}} errors detected in the last 5 minutes"
}
},
"create_incident": {
"webhook": {
"scheme": "https",
"host": "pagerduty.com",
"port": 443,
"method": "post",
"path": "/integration/{{ctx.metadata.pagerduty_key}}/enqueue",
"body": {
"service_key": "{{ctx.metadata.pagerduty_key}}",
"event_type": "trigger",
"description": "High error rate: {{ctx.payload.hits.total}} errors"
}
}
}
}
}# Machine Learning Job für Log-Anomalie-Erkennung
PUT _ml/anomaly_detectors/log-rate-anomaly
{
"description": "Detect anomalies in log rate",
"analysis_config": {
"bucket_span": "5m",
"detectors": [
{
"function": "count",
"detector_description": "Count of log messages"
}
],
"influencers": [
"kubernetes.namespace.keyword",
"level.keyword"
]
},
"data_description": {
"time_field": "@timestamp"
}
}
# Datafeed für ML Job
PUT _ml/datafeeds/datafeed-log-rate-anomaly
{
"job_id": "log-rate-anomaly",
"indices": ["app-logs-*"],
"query": {
"match_all": {}
}
}Für große Organisationen ist Multi-Cluster-Log-Aggregation essentiell.
# Fluentd Konfiguration für Multi-Cluster
<source>
@type forward
@id forward_input
port 24224
bind 0.0.0.0
<transport tls>
cert_path /fluentd/ssl/server.crt
private_key_path /fluentd/ssl/server.key
client_cert_auth true
</transport>
<security>
self_hostname central-fluentd
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</security>
</source>
# Output zu centralem Elasticsearch
<match cluster.**>
@type elasticsearch
host central-elasticsearch.logging.svc.cluster.local
port 9200
logstash_format true
logstash_prefix multi-cluster-logs
<buffer>
@type file
path /var/log/fluentd-buffers/multi-cluster.buffer
flush_interval 10s
chunk_limit_size 8MB
</buffer>
</match># Cross-Cluster-Search in Elasticsearch
PUT /_cluster/settings
{
"persistent": {
"cluster.remote.cluster_one.seeds": ["cluster1-master:9300"],
"cluster.remote.cluster_two.seeds": ["cluster2-master:9300"]
}
}
# Search across clusters
GET /local-logs-*,cluster_one:logs-*,cluster_two:logs-*/_search
{
"query": {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
}
}