44 EFK-Stack (Elasticsearch, Fluentd, Kibana)

Der EFK-Stack kombiniert Elasticsearch, Fluentd und Kibana zu einer integrierten Logging-Plattform für OpenShift-Umgebungen und implementiert End-to-End-Log-Management-Workflows von Sammlung über Verarbeitung bis Visualisierung. Diese Open-Source-Logging-Lösung bietet Enterprise-Grade-Logging-Funktionen mit skalierbarer Architektur und flexibler Konfiguration für umfassende Container-Plattform-Observability.

44.1 Architektonische Integration des EFK-Stacks

Die EFK-Stack-Architektur implementiert Pipeline-basierte Log-Verarbeitung mit Fluentd als Sammelebene, Elasticsearch als Speicher- und Index-Ebene und Kibana als Visualisierungs- und Analyse-Ebene. Diese modulare Architektur ermöglicht unabhängige Komponenten-Skalierung und flexible Deployment-Topologien.

[Diagramm: EFK-Stack-Architektur mit Datenfluss zwischen Fluentd, Elasticsearch und Kibana]

44.1.1 Komponenten-Interaktion

Die Interaktionsmuster zwischen EFK-Komponenten definieren Datenfluss mit konfigurierbarer Pufferung und Fehlerbehandlungsmechanismen. Diese Inter-Komponenten-Kommunikation gewährleistet zuverlässige Log-Verarbeitung auch bei Komponenten-Ausfällen oder Netzwerkproblemen.

Datenfluss-Übersicht: 1. Fluentd sammelt Logs von verschiedenen Quellen (Pods, Nodes, System-Services) 2. Transformation und Anreicherung der Log-Daten in Fluentd-Pipelines 3. Elasticsearch indexiert und speichert die verarbeiteten Logs 4. Kibana stellt Such- und Visualisierungsschnittstellen bereit

44.1.2 Operator-basiertes Deployment

Operator-basiertes Deployment automatisiert EFK-Stack-Installation und -Management durch Kubernetes-Operatoren:

# OpenShift Logging Operator
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: cluster-logging
  namespace: openshift-logging
spec:
  channel: stable
  name: cluster-logging
  source: redhat-operators
  sourceNamespace: openshift-marketplace

---
# ClusterLogging Custom Resource
apiVersion: logging.coreos.com/v1
kind: ClusterLogging
metadata:
  name: instance
  namespace: openshift-logging
spec:
  managementState: Managed
  logStore:
    type: elasticsearch
    retentionPolicy:
      application:
        maxAge: 7d
      infra:
        maxAge: 7d
      audit:
        maxAge: 7d
    elasticsearch:
      nodeCount: 3
      storage:
        storageClassName: fast-ssd
        size: 100G
      resources:
        requests:
          memory: 4Gi
          cpu: 1
        limits:
          memory: 4Gi
          cpu: 2
  visualization:
    type: kibana
    kibana:
      replicas: 1
      resources:
        requests:
          memory: 1Gi
          cpu: 500m
        limits:
          memory: 1Gi
          cpu: 1
  collection:
    logs:
      type: fluentd
      fluentd:
        resources:
          requests:
            memory: 1Gi
            cpu: 200m
          limits:
            memory: 1Gi
            cpu: 500m

44.2 Elasticsearch-Integration und Index-Management

Elasticsearch-Cluster-Architektur implementiert eine verteilte Such- und Analyse-Engine mit Node-Rollen-Spezialisierung für Master-, Data- und Koordinations-Funktionen.

44.2.1 Cluster-Konfiguration

# Elasticsearch Custom Resource
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: logging-cluster
  namespace: elastic-system
spec:
  version: 8.5.0
  nodeSets:
  # Master Nodes
  - name: masters
    count: 3
    config:
      node.roles: ["master"]
      xpack.security.enabled: true
      cluster.initial_master_nodes: 
        - logging-cluster-es-masters-0
        - logging-cluster-es-masters-1
        - logging-cluster-es-masters-2
    podTemplate:
      spec:
        containers:
        - name: elasticsearch
          resources:
            requests:
              memory: 2Gi
              cpu: 1
            limits:
              memory: 2Gi
              cpu: 2
          env:
          - name: ES_JAVA_OPTS
            value: "-Xms1g -Xmx1g"

  # Data Nodes
  - name: data
    count: 3
    config:
      node.roles: ["data", "ingest"]
      xpack.security.enabled: true
    podTemplate:
      spec:
        containers:
        - name: elasticsearch
          resources:
            requests:
              memory: 8Gi
              cpu: 2
            limits:
              memory: 8Gi
              cpu: 4
          env:
          - name: ES_JAVA_OPTS
            value: "-Xms4g -Xmx4g"
    volumeClaimTemplates:
    - metadata:
        name: elasticsearch-data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 500Gi
        storageClassName: fast-ssd

44.2.2 Index-Management-Strategien

Index-Management implementiert zeitbasierte Index-Erstellung mit automatischem Index-Rollover:

# Index Template für Application Logs
PUT _index_template/app-logs-template
{
  "index_patterns": ["app-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.refresh_interval": "30s",
      "index.codec": "best_compression",
      "index.query.default_field": [
        "message", "kubernetes.namespace", "kubernetes.pod"
      ]
    },
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "level": {
          "type": "keyword"
        },
        "message": {
          "type": "text",
          "analyzer": "standard",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "kubernetes": {
          "properties": {
            "namespace": {"type": "keyword"},
            "pod": {"type": "keyword"},
            "container": {"type": "keyword"},
            "node": {"type": "keyword"}
          }
        }
      }
    }
  }
}

# ILM Policy für Log-Retention
PUT _ilm/policy/app-logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "5GB",
            "max_age": "1d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "2d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "allocate": {
            "number_of_replicas": 0
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "allocate": {
            "number_of_replicas": 0
          },
          "set_priority": {
            "priority": 0
          }
        }
      },
      "delete": {
        "min_age": "30d"
      }
    }
  }
}

44.2.3 Shard-Management

Node-Typ Shards pro Node Heap-Größe Verwendung
Master 0 (keine Data-Shards) 1-2 GB Cluster-Management
Data 10-25 4-8 GB Log-Speicherung und -Suche
Ingest 0-5 2-4 GB Log-Verarbeitung
Coordinator 0 1-2 GB Query-Koordination

44.3 Fluentd-Konfiguration und Log-Processing

Fluentd fungiert als zentrale Log-Sammlung und -Verarbeitungsschicht im EFK-Stack.

44.3.1 Input-Plugin-Konfiguration

# Fluentd Konfiguration für Kubernetes-Logs
<source>
  @type tail
  @id container_logs
  path "/var/log/containers/*.log"
  pos_file "/var/log/fluentd-containers.log.pos"
  tag kubernetes.*
  read_from_head true
  
  <parse>
    @type multi_format
    <pattern>
      format json
      time_key time
      time_format %Y-%m-%dT%%%S.%NZ
    </pattern>
    <pattern>
      format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
      time_format %Y-%m-%dT%%%S.%N%
    </pattern>
  </parse>
</source>

# System Journal Logs
<source>
  @type systemd
  @id systemd-input
  tag systemd
  path /var/log/journal
  
  <storage>
    @type local
    persistent true
    path /var/log/fluentd-systemd.pos
  </storage>
  
  <entry>
    fields_strip_underscores true
    field_map {
      "MESSAGE": "message",
      "_SYSTEMD_UNIT": "systemd_unit",
      "_PID": "systemd_pid"
    }
  </entry>
</source>

44.3.2 Filter-Processing-Pipelines

# Kubernetes Metadata Filter
<filter kubernetes.**>
  @type kubernetes_metadata
  @id kubernetes_metadata
  kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
  verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
  preserve_json_log true
  merge_json_log true
  de_dot false
  annotation_match [ ".*" ]
</filter>

# Log Enhancement Filter
<filter kubernetes.**>
  @type record_transformer
  @id kubernetes_record_transformer
  enable_ruby true
  
  <record>
    cluster_name "#{ENV['CLUSTER_NAME'] || 'openshift-cluster'}"
    log_level ${record.dig("kubernetes", "labels", "log-level") || "info"}
    app_name ${record.dig("kubernetes", "labels", "app") || "unknown"}
    environment ${record.dig("kubernetes", "namespace_labels", "environment") || "unknown"}
  </record>
  
  remove_keys kubernetes.pod_id
</filter>

# Multi-line Processing für Java Stack Traces
<filter kubernetes.**>
  @type concat
  key log
  stream_identity_key stream
  multiline_start_regexp /^\d{4}-\d{2}-\d{2}/
  separator ""
  timeout_label @NORMAL
</filter>

# PII Scrubbing Filter
<filter **>
  @type record_modifier
  <record>
    message ${record["message"]&.gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, "****-****-****-****")}
    message ${record["message"]&.gsub(/\b[\w\.-]+@[\w\.-]+\.\w+\b/, "***@***.***")}
  </record>
</filter>

44.3.3 Output-Plugin-Konfiguration

# Elasticsearch Output
<match **>
  @type elasticsearch
  @id elasticsearch_output
  host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
  port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
  scheme https
  ssl_verify true
  user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
  password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
  
  logstash_format true
  logstash_prefix app-logs
  logstash_dateformat %Y.%m.%d
  
  include_tag_key true
  tag_key fluentd_tag
  
  <buffer>
    @type file
    path /var/log/fluentd-buffers/kubernetes.system.buffer
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 5s
    retry_forever
    retry_max_interval 30
    chunk_limit_size 2M
    queue_limit_length 8
    overflow_action block
  </buffer>
</match>

# Error Handling
<label @ERROR>
  <match **>
    @type file
    @id error_output
    path /var/log/fluentd-error.log
    append true
    
    <buffer>
      flush_mode immediate
    </buffer>
  </match>
</label>

44.3.4 Buffer-Management-Strategien

Buffer-Typ Verwendung Vorteile Nachteile
Memory Hoher Durchsatz Schnell, niedrige Latenz Datenverlust bei Crash
File Zuverlässigkeit Persistenz, kein Datenverlust Langsamere I/O
Hybrid Ausgewogenheit Geschwindigkeit + Sicherheit Komplexere Konfiguration

44.4 Kibana-Visualisierung und Dashboard-Management

Kibana stellt die Benutzeroberfläche für Log-Suche, -Analyse und -Visualisierung bereit.

44.4.1 Index-Pattern-Konfiguration

# Index Pattern über Kibana API erstellen
curl -X POST "kibana:5601/api/saved_objects/index-pattern/app-logs-*" \
  -H "Content-Type: application/json" \
  -H "kbn-xsrf: true" \
  -d '{
    "attributes": {
      "title": "app-logs-*",
      "timeFieldName": "@timestamp",
      "fields": "[{\"name\":\"@timestamp\",\"type\":\"date\",\"searchable\":true,\"aggregatable\":true},{\"name\":\"level\",\"type\":\"string\",\"searchable\":true,\"aggregatable\":true},{\"name\":\"message\",\"type\":\"string\",\"searchable\":true,\"aggregatable\":false}]"
    }
  }'

44.4.2 Discovery-Interface-Nutzung

# Kibana Query DSL für Log-Suche
# Fehler-Logs der letzten 24 Stunden
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "level": "ERROR"
          }
        },
        {
          "range": {
            "@timestamp": {
              "gte": "now-24h"
            }
          }
        }
      ],
      "filter": [
        {
          "term": {
            "kubernetes.namespace.keyword": "production"
          }
        }
      ]
    }
  },
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}

44.4.3 Visualization-Komponenten

# Kibana Visualization für Log-Level-Distribution
{
  "version": "8.5.0",
  "objects": [
    {
      "id": "log-level-pie",
      "type": "visualization",
      "attributes": {
        "title": "Log Level Distribution",
        "visState": {
          "type": "pie",
          "params": {
            "addTooltip": true,
            "addLegend": true,
            "legendPosition": "right"
          },
          "aggs": [
            {
              "id": "1",
              "type": "count",
              "schema": "metric",
              "params": {}
            },
            {
              "id": "2", 
              "type": "terms",
              "schema": "segment",
              "params": {
                "field": "level.keyword",
                "size": 10,
                "order": "desc",
                "orderBy": "1"
              }
            }
          ]
        },
        "kibanaSavedObjectMeta": {
          "searchSourceJSON": {
            "index": "app-logs-*",
            "query": {
              "match_all": {}
            },
            "filter": []
          }
        }
      }
    }
  ]
}

44.4.4 Dashboard-Komposition

# Complete Dashboard für Application Monitoring
{
  "version": "8.5.0",
  "objects": [
    {
      "id": "app-monitoring-dashboard",
      "type": "dashboard",
      "attributes": {
        "title": "Application Monitoring Dashboard",
        "panelsJSON": [
          {
            "gridData": {"x": 0, "y": 0, "w": 24, "h": 15},
            "panelIndex": "1",
            "version": "8.5.0",
            "panelRefName": "panel_1"
          },
          {
            "gridData": {"x": 24, "y": 0, "w": 24, "h": 15},
            "panelIndex": "2",
            "version": "8.5.0", 
            "panelRefName": "panel_2"
          }
        ],
        "timeRestore": true,
        "timeTo": "now",
        "timeFrom": "now-24h",
        "refreshInterval": {
          "pause": false,
          "value": 30000
        }
      }
    }
  ]
}

44.5 Log-Daten-Verarbeitung und -Anreicherung

Erweiterte Log-Verarbeitung verbessert die Suchbarkeit und Analysefähigkeiten von Log-Daten.

44.5.1 Log-Normalisierung

# Fluentd Normalisierung verschiedener Anwendungen
<filter kubernetes.**>
  @type record_transformer
  enable_ruby true
  
  <record>
    # Standardisiere Log-Level
    normalized_level ${
      case record["level"]&.downcase
      when "warn", "warning" then "WARNING"
      when "err", "error", "fatal" then "ERROR"
      when "debug", "trace" then "DEBUG"
      when "info", "information" then "INFO"
      else (record["level"]&.upcase || "UNKNOWN")
      end
    }
    
    # Extrahiere HTTP Status Code
    http_status ${
      if record["message"] =~ /HTTP.*?(\d{3})/
        $1.to_i
      else
        nil
      end
    }
    
    # Parse Response Time
    response_time_ms ${
      if record["message"] =~ /(\d+(?:\.\d+)?)ms/
        $1.to_f
      else
        nil
      end
    }
  </record>
</filter>

44.5.2 Geo-Location-Anreicherung

# GeoIP Enrichment für IP-Adressen
<filter **>
  @type geoip
  
  # GeoIP Database Path
  geoip_database "/etc/fluentd/GeoLite2-City.mmdb"
  
  # Extract IP from message
  <record>
    geoip_city ${geoip_city['client_ip']}
    geoip_country ${geoip_country_name['client_ip']}  
    geoip_location ${geoip_location['client_ip']}
  </record>
  
  remove_keys client_ip
</filter>

44.5.3 User-Agent-Parsing

# User Agent Parser für HTTP-Logs
<filter nginx.**>
  @type parser
  key_name message
  reserve_data true
  
  <parse>
    @type regexp
    expression /^(?<remote_addr>[^ ]*) .* \[(?<time_local>[^\]]*)\] "(?<method>[^ ]*) (?<request>[^ ]*) (?<protocol>[^"]*)" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)"$/
    time_format %d/%b/%Y:%H:%M:%S %z
  </parse>
</filter>

<filter nginx.**>
  @type ua_parser
  key_name http_user_agent
  out_key ua
  flatten true
</filter>

44.6 Performance-Optimierung

Für große EFK-Deployments sind spezielle Performance-Optimierungen erforderlich.

44.6.1 Elasticsearch-Performance-Tuning

# Optimierte Elasticsearch-Konfiguration
elasticsearch.yml: |
  cluster.name: logging-cluster
  
  # JVM Heap Sizing (50% of available RAM, max 32GB)
  # Set via ES_JAVA_OPTS: -Xms4g -Xmx4g
  
  # Thread Pool Settings
  thread_pool:
    write:
      size: 4
      queue_size: 1000
    search:
      size: 6
      queue_size: 1000
  
  # Index Settings
  indices:
    memory:
      index_buffer_size: 20%
    fielddata:
      cache:
        size: 15%
  
  # Network Settings
  network:
    host: 0.0.0.0
    tcp:
      keep_alive: true
      reuse_address: true
  
  # Discovery Settings
  discovery:
    seed_hosts: ["master-0", "master-1", "master-2"]
    zen:
      minimum_master_nodes: 2
  
  # Performance Settings  
  bootstrap:
    memory_lock: true
  indices:
    queries:
      cache:
        size: 15%

44.6.2 Fluentd-Performance-Optimierung

# Optimierte Fluentd Worker-Konfiguration
<system>
  workers 4
  root_dir /var/log/fluentd
  log_level info
  suppress_repeated_stacktrace true
</system>

# Optimized Buffer Settings
<match **>
  @type elasticsearch
  
  <buffer>
    @type file
    path /var/log/fluentd-buffers/elasticsearch
    
    # Flush Settings
    flush_mode interval
    flush_interval 5s
    flush_thread_count 4
    
    # Chunk Settings  
    chunk_limit_size 8MB
    chunk_limit_records 10000
    
    # Queue Settings
    queue_limit_length 64
    overflow_action drop_oldest_chunk
    
    # Retry Settings
    retry_type exponential_backoff
    retry_wait 1s
    retry_max_interval 60s
    retry_timeout 1h
    retry_forever true
  </buffer>
</match>

44.6.3 Storage-Optimierung

# Elasticsearch Index-Optimierung
PUT /app-logs-*/_settings
{
  "index": {
    "refresh_interval": "30s",
    "number_of_replicas": 0,
    "translog": {
      "flush_threshold_size": "1gb",
      "sync_interval": "30s"
    },
    "merge": {
      "policy": {
        "max_merged_segment": "5gb"
      }
    }
  }
}

# Force Merge für ältere Indizes
POST /app-logs-2024.01.*/_forcemerge?max_num_segments=1&wait_for_completion=false

44.7 Sicherheits-Integration

EFK-Stack-Sicherheit umfasst Authentifizierung, Autorisierung und Verschlüsselung.

44.7.1 Elasticsearch-Sicherheitskonfiguration

# Elasticsearch Security Settings
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: elastic-certificates.p12

# Built-in Users
PUT /_security/user/kibana_system/_password
{
  "password": "kibana_system_password"
}

PUT /_security/user/elastic/_password  
{
  "password": "elastic_password"
}

44.7.2 RBAC für Log-Zugriff

# Elasticsearch Role für Namespace-spezifischen Zugriff
PUT /_security/role/logs_reader_production
{
  "indices": [
    {
      "names": ["app-logs-*"],
      "privileges": ["read"],
      "query": {
        "term": {
          "kubernetes.namespace.keyword": "production"
        }
      }
    }
  ]
}

# User mit eingeschränkten Berechtigungen
PUT /_security/user/prod_developer
{
  "password": "secure_password",
  "roles": ["logs_reader_production"],
  "full_name": "Production Developer",
  "email": "prod.dev@company.com"
}

44.7.3 Log-Daten-Maskierung

# Sensitive Data Masking in Fluentd
<filter **>
  @type record_modifier
  <replace>
    key message
    expression /password[=:]\s*\S+/i
    replace password=***MASKED***
  </replace>
  
  <replace>
    key message  
    expression /token[=:]\s*[\w\-\.]+/i
    replace token=***MASKED***
  </replace>
  
  <replace>
    key message
    expression /\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/
    replace ****-****-****-****
  </replace>
</filter>

44.8 Alerting und Automatisierte Antworten

Der EFK-Stack unterstützt erweiterte Alerting-Funktionen basierend auf Log-Inhalten.

44.8.1 Elasticsearch Watcher

# Watcher für hohe Error-Rate
PUT _watcher/watch/high_error_rate
{
  "trigger": {
    "schedule": {
      "interval": "2m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": ["app-logs-*"],
        "body": {
          "query": {
            "bool": {
              "must": [
                {
                  "match": {
                    "level": "ERROR"
                  }
                },
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-5m"
                    }
                  }
                }
              ]
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gte": 50
      }
    }
  },
  "actions": {
    "send_email": {
      "email": {
        "to": ["ops-team@company.com"],
        "subject": "High Error Rate Alert",
        "body": "{{ctx.payload.hits.total}} errors detected in the last 5 minutes"
      }
    },
    "create_incident": {
      "webhook": {
        "scheme": "https",
        "host": "pagerduty.com",
        "port": 443,
        "method": "post",
        "path": "/integration/{{ctx.metadata.pagerduty_key}}/enqueue",
        "body": {
          "service_key": "{{ctx.metadata.pagerduty_key}}",
          "event_type": "trigger",
          "description": "High error rate: {{ctx.payload.hits.total}} errors"
        }
      }
    }
  }
}

44.8.2 Anomalie-Erkennung

# Machine Learning Job für Log-Anomalie-Erkennung
PUT _ml/anomaly_detectors/log-rate-anomaly
{
  "description": "Detect anomalies in log rate",
  "analysis_config": {
    "bucket_span": "5m",
    "detectors": [
      {
        "function": "count",
        "detector_description": "Count of log messages"
      }
    ],
    "influencers": [
      "kubernetes.namespace.keyword",
      "level.keyword"
    ]
  },
  "data_description": {
    "time_field": "@timestamp"
  }
}

# Datafeed für ML Job
PUT _ml/datafeeds/datafeed-log-rate-anomaly
{
  "job_id": "log-rate-anomaly",
  "indices": ["app-logs-*"],
  "query": {
    "match_all": {}
  }
}

44.9 Multi-Cluster-Integration

Für große Organisationen ist Multi-Cluster-Log-Aggregation essentiell.

44.9.1 Cross-Cluster-Log-Aggregation

# Fluentd Konfiguration für Multi-Cluster
<source>
  @type forward
  @id forward_input
  port 24224
  bind 0.0.0.0
  
  <transport tls>
    cert_path /fluentd/ssl/server.crt
    private_key_path /fluentd/ssl/server.key
    client_cert_auth true
  </transport>
  
  <security>
    self_hostname central-fluentd
    shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
  </security>
</source>

# Output zu centralem Elasticsearch
<match cluster.**>
  @type elasticsearch
  host central-elasticsearch.logging.svc.cluster.local
  port 9200
  
  logstash_format true
  logstash_prefix multi-cluster-logs
  
  <buffer>
    @type file
    path /var/log/fluentd-buffers/multi-cluster.buffer
    flush_interval 10s
    chunk_limit_size 8MB
  </buffer>
</match>
# Cross-Cluster-Search in Elasticsearch
PUT /_cluster/settings
{
  "persistent": {
    "cluster.remote.cluster_one.seeds": ["cluster1-master:9300"],
    "cluster.remote.cluster_two.seeds": ["cluster2-master:9300"]
  }
}

# Search across clusters
GET /local-logs-*,cluster_one:logs-*,cluster_two:logs-*/_search
{
  "query": {
    "bool": {
      "must": [
        {"match": {"level": "ERROR"}},
        {"range": {"@timestamp": {"gte": "now-1h"}}}
      ]
    }
  }
}