O planejador de fluxo de ar falha ao iniciar com o executor kubernetes

12

Estou usando o https://github.com/helm/charts/tree/master/stable/airflow helm chart e construindo a puckle/docker-airflowimagem v1.10.8 com o kubernetes instalado e usando essa imagem no gráfico de helm, mas continuo recebendo

  File "/usr/local/bin/airflow", line 37, in <module>
    args.func(args)
  File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 1140, in initdb
    db.initdb(settings.RBAC)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 332, in initdb
    dagbag = models.DagBag()
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 95, in __init__
    executor = get_default_executor()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/__init__.py", line 48, in get_default_executor
    DEFAULT_EXECUTOR = _get_executor(executor_name)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/__init__.py", line 87, in _get_executor
    return KubernetesExecutor()
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 702, in __init__
    self.kube_config = KubeConfig()
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 283, in __init__
    self.kube_client_request_args = json.loads(kube_client_request_args)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)

No meu agendador, também como várias fontes aconselham, tentei definir:

AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: {"_request_timeout" : [60,60] }

nos meus valores de leme. que também não funcionou ninguém tem alguma idéia do que estou perdendo?

Aqui estão os meus valores.yaml


airflow:
  image:
     repository: airflow-docker-local
     tag: 1.10.8
  executor: Kubernetes
  service:
    type: LoadBalancer
  config:
    AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: airflow-docker-local
    AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: 1.10.8
    AIRFLOW__KUBERNETES__WORKER_CONTAINER_IMAGE_PULL_POLICY: Never

    AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME: airflow
    AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM: airflow
    AIRFLOW__KUBERNETES__NAMESPACE: airflow
    AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: {"_request_timeout" : [60,60] }

    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@airflow-postgresql:5432/airflow

persistence:
  enabled: true
  existingClaim: ''

workers:
  enabled: false

postgresql:
  enabled: true

redis:
  enabled: false

EDIT:

Várias tentativas de definir variáveis ​​de ambiente em valores de leme. Yaml não funcionou, depois disso eu adicionei (preste atenção às aspas duplas e simples)

ENV AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS='{"_request_timeout" : [60,60] }'

para o Dockerfile aqui: https://github.com/puckel/docker-airflow/blob/1.10.9/Dockerfile#L19 depois que meu airflow-schedulerpod iniciar, mas continuarei recebendo o seguinte erro no pod do agendador.

Process KubernetesJobWatcher-9: Traceback (most recent call last): 
    File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 313, 
    in recv_into return self.connection.recv_into(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", 
    line 1840, in recv_into self._raise_ssl_error(self._ssl, result) File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", 
    line 1646, in _raise_ssl_error raise WantReadError() OpenSSL.SSL.WantReadError
Asav Patel
fonte
mesma questão aqui. Fiz o check-out docker-airflow:1.10.8e removi o L931 alterado de config/airflow.cfgpara kube_client_request_args =e use esta imagem. Parece funcionar
Raf
mesmo problema aqui :-(
LiorH 17/02

Respostas:

3

Para o valor do leme, o modelo usa um loop que coloca o airflow.configmapa entre aspas duplas" . Isso significa que qualquer "um em um valor precisa ser escapado para que o YAML com modelo de saída seja válido.

airflow:
  config:
    AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{\"_request_timeout\":60}'

Implanta e executa (mas não concluí um teste de ponta a ponta)

De acordo com esse problema do github , o tempo limite do SSL do agendador python pode não ser um problema, pois o observador inicia novamente após o tempo limite de conexão de 60 segundos.

Matt
fonte
sim funcionou, obrigado :-)
LiorH 27/02
Sem problemas! Você é capaz de verificar se o agendador funciona, ele se reconecta a cada X segundos?
Matt
Sim, o planejador funciona. Correndo para um monte de outras questões :-(
LiorH