����λ�ã���ҳ > �����̳� > �̳� > ArgoWorkflow�̳�(��)---Workflow �Ķ��ִ���ģʽ���ֶ�����ʱ�������¼�����

ArgoWorkflow�̳�(��)---Workflow �Ķ��ִ���ģʽ���ֶ�����ʱ�������¼�����

��Դ������������|��ʱ�䣺2024-09-25 09:47:47 |���Ķ���63��|�� ��ǩ�� a Flow GO �� �̳� K Argo workflow AR �� |����������

��һƪ���Ƿ�����argo-workflow �е� archive������ ��ˮ��GC����ˮ�߹鵵����־�鵵�ȹ��ܡ���ƪ��Ҫ���� Workflow �еļ��ִ�����ʽ�������ֶ���������ʱ������Event �¼������ȡ� 1. ���� Argo Workflows ����ˮ���ж��ִ�����ʽ�� �ֶ��������ֶ��ύһ

ArgoWorkflow½Ì³Ì(Îå)---Workflow µÄ¶àÖÖ´¥·¢Ä£Ê½£ºÊÖ¶¯¡¢¶¨Ê±ÈÎÎñÓëʼþ´¥·¢

��һƪ���Ƿ�����argo-workflow �е� archive������ ��ˮ��GC����ˮ�߹鵵����־�鵵�ȹ��ܡ���ƪ��Ҫ���� Workflow �еļ��ִ�����ʽ�������ֶ���������ʱ������Event �¼������ȡ�

1. ����

Argo Workflows ����ˮ���ж��ִ�����ʽ��

  • �ֶ��������ֶ��ύһ�� Workflow���ͻᴥ��һ�ι�������ô���Ǵ�������ˮ�ߣ��������� WorkflowTemplate ����
  • ��ʱ������ CronWorkflow �������� k8s �е� job �� cronjob��CronWorkflow �ᶨʱ���� Workflow ��ʵ�ֶ�ʱ������
  • Event �¼�����������ͨ��git commit ���������� argo-events ����ʵ�ִ˹��ܡ�

2. ��ʱ����

CronWorkflow �����Ͼ���һ�� Workflow + Cron Spec��

����ϲο��� k8s �е� CronJob

Demo

һ���򵥵� CronWorkflow ���£�

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: test-cron-wf
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 90"]

apply һ�£����Կ������������� Workflow ����Ϊ $cronWorkflowName-xxx

[root@lixd-argo workdir]# k get cwf
NAME           AGE
test-cron-wf   116s
[root@lixd-argo workdir]# k get wf
NAME                      STATUS    AGE   MESSAGE
test-cron-wf-1711852560   Running   47s

���� template ������������ sleep 90s ��ˣ����������ʱ�϶��dz��� 60s �ģ��������õ� concurrencyPolicy Ϊ Replace ����� 60s �󣬵ڶ��� Workflow ��������������һ���ͻᱻֹͣ����

[root@lixd-argo workdir]# k get wf
NAME                      STATUS    AGE    MESSAGE
test-cron-wf-1711852560   Failed    103s   Stopped with strategy 'Terminate'
test-cron-wf-1711852620   Running   43s

�������

֧�ֵľ���������£�

type CronWorkflowSpec struct {
	// WorkflowSpec is the spec of the workflow to be run
	WorkflowSpec WorkflowSpec `json:"workflowSpec" protobuf:"bytes,1,opt,name=workflowSpec,casttype=WorkflowSpec"`
	// Schedule is a schedule to run the Workflow in Cron format
	Schedule string `json:"schedule" protobuf:"bytes,2,opt,name=schedule"`
	// ConcurrencyPolicy is the K8s-style concurrency policy that will be used
	ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
	// Suspend is a flag that will stop new CronWorkflows from running if set to true
	Suspend bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
	// StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its
	// original scheduled time if it is missed.
	StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,5,opt,name=startingDeadlineSeconds"`
	// SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time
	SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
	// FailedJobsHistoryLimit is the number of failed jobs to be kept at a time
	FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
	// Timezone is the timezone against which the cron schedule will be calculated, e.g. "Asia/Tokyo". Default is machine's local time.
	Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
	// WorkflowMetadata contains some metadata of the workflow to be run
	WorkflowMetadata *metav1.ObjectMeta `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
}

���ݿ��Է�Ϊ 3 ���֣�

  • WorkflowSpec ��������� Workflow �� Spec��һģһ����
  • Cron Spec��������һЩ Cron ����ֶ�
  • WorkflowMetadata��һЩ metadata�������� CronWorkflow ������ Workflow ����Я��������ָ���� metadata

WorkflowSpec �� WorkflowMetadata û̫�����𣬾Ͳ�׸���ˣ�����һ�� Cron Spec ��صļ����ֶΣ�

  • schedule��cron ����ʽ�� * * * * * ÿ���Ӵ���һ��
  • concurrencyPolicy������ģʽ��֧�� Allow��Forbid��Replace
    • Allow������ͬʱ���ж�� Workflow
    • Forbid����ֹ�������� Workflow ����ʱ���Ͳ����ٴ����µ�
    • Replace�� ��ʾ�´��� Workflow �滻���ɵģ�����ͬʱ���ж�� Workflow��
  • startingDeadlineSeconds��Workflow ������������һ�� Pod ���������ʱ�䣬��ʱ��ͻᱻ���Ϊʧ�ܡ�
  • suspend��flag �Ƿ�ֹͣ CronWorkflow���ڶ�ʱ������Ҫִ���ǿ�������Ϊ true��
  • timezone��ʱ����Ĭ��ʹ�û����ϵı���ʱ��

�󲿷��ֶκ� K8s CronJob һ��

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: my-cron
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 10"]
  workflowMetadata:
    labels:
      from: cron

������ metadata������һ��

[root@lixd-argo workdir]# k get wf my-cron-1711853400 -oyaml|grep labels -A 1
  labels:
    from: cron

���Կ��������������� Workflow ȷʵЯ�����ˣ��� CronWorkflow ��ָ���� label��

3. Event

argo �ṩ��һ�� Event API�� /api/v1/events/{namespace}/{discriminator} ,�� API ���Խ������� json ���ݡ�

ͨ�� event API ���Դ��� Workflow �������� Webhook��

��������������

curl https://localhost:2746/api/v1/events/argo/ \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'

����������

curl https://localhost:2746/api/v1/events/argo/my-discriminator \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'
  • 1��׼�� Token
  • 2������ WorkflowEventBinding�����ý��� event �Լ��յ� event �󴴽��� Workflow ��Ϣ
  • 3��������������

Token

���� RBAC ��ض���,role��rolebinding��sa������ role ֻ��Ҫ�ṩ��СȨ�޼��ɡ�

ֱ�Ӵ����� default �����ռ�

kubectl apply -f - <

serviceaccount �� rolebinding

kubectl create sa test

kubectl create rolebinding test --role=test --serviceaccount=default:test

Ȼ�󴴽�һ�� Secret

kubectl apply -f - <

���Ϳ��Բ�ѯ Secret ���� Token ��

ARGO_TOKEN="Bearer $(kubectl get secret test.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"

echo $ARGO_TOKEN
Bearer ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNkltS...

���ԣ��ܷ�����ʹ��

ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')

curl http://$ARGO_SERVER:2746/api/v1/workflow-event-bindings/default -H "Authorization: $ARGO_TOKEN"

WorkflowEventBinding

Ϊ�˽��� Event�����Դ��� WorkflowEventBinding ���󣬾������£�

apiVersion: argoproj.io/v1alpha1
kind: WorkflowEventBinding
metadata:
  name: event-consumer
spec:
  event:
    # metadata header name must be lowercase to match in selector
    selector: payload.message != "" && metadata["x-argo-e2e"] == ["true"] && discriminator == "my-discriminator"
  submit:
    workflowTemplateRef:
      name: my-wf-tmple
    arguments:
      parameters:
      - name: message
        valueFrom:
          event: payload.message

spec.event ָ���˸� Binding �����ƥ���յ��� Event������������������ǣ�

  • 1��payload ����һ�� message ������ֵ��Ϊ��
  • 2��header ��� x-argo-e2e����ֵΪ true
    • ע�⣺����ƥ���ʱ�� header ���ᱻתΪСд
  • 3�������� discriminator ����Ϊ my-discriminator

���ƥ�����ʹ�� submit ����ָ�������ݴ��� Workflow��

  • 1��ʹ�� my-wf-tmple ��� workflowTemplate ���� Workflow
  • 2��ʹ�� payload.message ������

���ڴ������� Workflow ������ my-wf-tmple ������,�ȴ������ Template

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: my-wf-tmple
spec:
  templates:
    - name: main
      inputs:
        parameters:
          - name: message
            value: "{{workflow.parameters.message}}"
      container:
        image: docker/whalesay:latest
        command: [cowsay]
        args: ["{{inputs.parameters.message}}"]
  entrypoint: main

������ǾͿ��Է��� API ������ event ʵ�� Workflow �Ĵ���

curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
    -H "Authorization: $ARGO_TOKEN" \
    -H "X-Argo-E2E: true" \
    -d '{"message": "hello events"}'

����һ�£�

{}[root@lixd-argo workdir]# curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
>     -H "Authorization: $ARGO_TOKEN" \
>     -H "X-Argo-E2E: true" \
>     -d '{"message": "hello events"}'
{}[root@lixd-argo workdir]# k get wf
NAME                STATUS    AGE   MESSAGE
my-wf-tmple-ea81n   Running   5s
[root@lixd-argo workdir]# k get wf my-wf-tmple-ea81n -oyaml|grep parameters -A 5
    parameters:
    - name: message
      value: hello events

���Կ�����Workflow �Ѿ����������ˣ����Ҳ���Ҳ�����Ƿ�����ʱ���� hello events��

����

Ĭ������� argo-server ����ͬʱ���� 64 ���¼����ٶ�ͻ�ֱ�ӷ��� 503 �ˣ�����ͨ�����²������е�����

  • 1��--event-operation-queue-size�����Ӷ��д�С���Խ��ո�����¼�
  • 2��--event-worker-count������ worker ���������������ٶ�

4. Webhook

ǰ�� Event �½��ᵽ�˿���ͨ������ HTTP ����ķ�ʽ���������� event �� Workflow��������Ҫ�ͻ����ṩ AuthToken��

�������ˣ�����һЩ����ָ�� Token �Ŀͻ�����˵�ͱȽ��鷳�ˣ����� Github��Gitlab �� Git �ֿ⣬���������� Webhook�����յ� commit ��ʱ����� Webhook ��������ˮ�ߡ�

��ʱ����Щ���͹���������϶���û�д� Token �ģ������Ҫ����������������֤����֤ argo ֻ�������� Github��Gitlab �ȵ�ƽ̨�� Webhook ����

  • 1������ RBAC ��ض���,role��rolebinding��sa ׼���� token
  • 2������ Webhook-clients������ argo ʲô���͵� Webhook ����ʹ���Ǹ� secret ��Ϊ token

��һ�� Token �� Event �½�һ�£��Ͳ���׸���ˣ���Ҫ�ǵڶ�����

webhook-clients config

��һ�������� RBAC ����׼���� Secret ֮��һ��ͻ��˶��ǽ��� Secret �е� Token��Ȼ����ϸ� Token �������󣬾���������

ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')
ARGO_TOKEN="Bearer $(kubectl get secret jenkins.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"

curl https://$ARGO_SERVER:2746/api/v1/events/default/ \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'

���ǣ����� Webhook �ͻ�����˵����û�취����ָ�� token �ģ������Ҫͨ�� argo-workflows-webhook-clients ���������� argo,�ĸ� Webhook ʹ���ĸ� Secret �е� token��

����һ����Ϊ argo-workflows-webhook-clients �� Secret�����ݴ����������ģ�

kind: Secret
apiVersion: v1
metadata:
  name: argo-workflows-webhook-clients
# The data keys must be the name of a service account.
stringData:
  # https://support.atlassian.com/bitbucket-cloud/docs/manage-webhooks/
  bitbucket.org: |
    type: bitbucket
    secret: "my-uuid"
  # https://confluence.atlassian.com/bitbucketserver/managing-webhooks-in-bitbucket-server-938025878.html
  bitbucketserver: |
    type: bitbucketserver
    secret: "shh!"
  # https://developer.github.com/webhooks/securing/
  github.com: |
    type: github
    secret: "shh!"
  # https://docs.gitlab.com/ee/user/project/integrations/webhooks.html
  gitlab.com: |
    type: gitlab
    secret: "shh!"
  • ���� Key �����ǵ�ǰ Namespace �µ� Serviceaccount ���ơ�
  • Value ����� type �� secret �����֡�
    • type��Webhook ��Դ������ github��gitlab
    • secret��һ���ַ������� k8s secret��һ���ڶ�Ӧƽ̨���� Webhook ʱ��������

�� Github ���壬secret �������£�

������ Webhook ʱ������һ�� Secret ���ã�ʵ�ʾ���һ�������ַ��������ʲô�����ԡ�

���� Github ���� Webhook ����ʱ�ͻ�Я������� Secret ��Ϣ��Argo �յ���͸��� argo-workflows-webhook-clients �� Secret �����õ� type=github �� secret �ֶν��жԱȣ����ƥ���Ͼʹ���������ͺ��Ը�����

ArgoWorkflow½Ì³Ì(Îå)---Workflow µÄ¶àÖÖ´¥·¢Ä£Ê½£ºÊÖ¶¯¡¢¶¨Ê±ÈÎÎñÓëʼþ´¥·¢

�����ƥ���ϾʹӶ�Ӧ�� Serviceaccount �н��� Token ��Ϊ Authorization ��Ϣ��

Դ�����

Webhook ��һ�飬�ٷ��ĵ����Ǻ���ϸ��һ�ʴ����ˣ���˷�����Դ�롣

����߼���һ�� Interceptor ����ʽ���֣��������� Event API ���ᾭ�����߼��� ����Ϊû��Я�� Authorization ���������� Authorization ��Ϣ ��

// Interceptor creates an annotator that verifies webhook signatures and adds the appropriate access token to the request.
func Interceptor(client kubernetes.Interface) func(w http.ResponseWriter, r *http.Request, next http.Handler) {
	return func(w http.ResponseWriter, r *http.Request, next http.Handler) {
		err := addWebhookAuthorization(r, client)
		if err != nil {
			log.WithError(err).Error("Failed to process webhook request")
			w.WriteHeader(403)
			// hide the message from the user, because it could help them attack us
			_, _ = w.Write([]byte(`{"message": "failed to process webhook request"}`))
		} else {
			next.ServeHTTP(w, r)
		}
	}
}

���� addWebhookAuthorization ����������֤��Ϣ��

func addWebhookAuthorization(r *http.Request, kube kubernetes.Interface) error {
	// try and exit quickly before we do anything API calls
	if r.Method != "POST" || len(r.Header["Authorization"]) > 0 || !strings.HasPrefix(r.URL.Path, pathPrefix) {
		return nil
	}
	parts := strings.SplitN(strings.TrimPrefix(r.URL.Path, pathPrefix), "/", 2)
	if len(parts) != 2 {
		return nil
	}
	namespace := parts[0]
	secretsInterface := kube.CoreV1().Secrets(namespace)
	ctx := r.Context()

	webhookClients, err := secretsInterface.Get(ctx, "argo-workflows-webhook-clients", metav1.GetOptions{})
	if err != nil {
		return fmt.Errorf("failed to get webhook clients: %w", err)
	}
	// we need to read the request body to check the signature, but we still need it for the GRPC request,
	// so read it all now, and then reinstate when we are done
	buf, _ := io.ReadAll(r.Body)
	defer func() { r.Body = io.NopCloser(bytes.NewBuffer(buf)) }()
	serviceAccountInterface := kube.CoreV1().ServiceAccounts(namespace)
	for serviceAccountName, data := range webhookClients.Data {
		r.Body = io.NopCloser(bytes.NewBuffer(buf))
		client := &webhookClient{}
		err := yaml.Unmarshal(data, client)
		if err != nil {
			return fmt.Errorf("failed to unmarshal webhook client \"%s\": %w", serviceAccountName, err)
		}
		log.WithFields(log.Fields{"serviceAccountName": serviceAccountName, "webhookType": client.Type}).Debug("Attempting to match webhook request")
		ok := webhookParsers[client.Type](client.Secret, r)
		if ok {
			log.WithField("serviceAccountName", serviceAccountName).Debug("Matched webhook request")
			serviceAccount, err := serviceAccountInterface.Get(ctx, serviceAccountName, metav1.GetOptions{})
			if err != nil {
				return fmt.Errorf("failed to get service account \"%s\": %w", serviceAccountName, err)
			}
			tokenSecret, err := secretsInterface.Get(ctx, secrets.TokenNameForServiceAccount(serviceAccount), metav1.GetOptions{})
			if err != nil {
				return fmt.Errorf("failed to get token secret \"%s\": %w", tokenSecret, err)
			}
			r.Header["Authorization"] = []string{"Bearer " + string(tokenSecret.Data["token"])}
			return nil
		}
	}
	return nil
}

�����������£�

  • �����ж��ˣ�ֻ�� POST ���������� Authorization Ϊ��ʱ�Ż��Զ����ӡ�
  • Ȼ��ʹ� API ��ָ���� Namespace �²�ѯ��Ϊ argo-workflows-webhook-clients �� Secret��
  • ������ѭ���Աȣ�Secret �е� type �� secret �ܷ�͵�ǰ����ƥ���ϣ����ƥ������� data ��Ӧ�� key ���� serviceaccount ��ȥ��ѯ token Ȼ����� token ���� Authorization ʹ�á�

��������ֱ��ʹ�� key ��Ϊ serviceaccount����Ҳ����Ϊʲô���� argo-workflows-webhook-clients ʱ��Ҫ�� serviceaccount ������Ϊ key��


��ArgoWorkflow ϵ�С� ���������У��������ںš� ̽����ԭ�� �����ģ��Ķ��������¡�

ArgoWorkflow½Ì³Ì(Îå)---Workflow µÄ¶àÖÖ´¥·¢Ä£Ê½£ºÊÖ¶¯¡¢¶¨Ê±ÈÎÎñÓëʼþ´¥·¢


5. ��

������Ҫ������ Argo �е� Workflow �ļ��ִ�����ʽ��

  • 1���ֶ��������ֶ����� Workflow ����ʽ������ˮ������
  • 2����ʱ������ʹ�� CronWorkflow ���� Cron ����ʽ�Զ����� Workflow
  • 3��Event��ʹ�� argo-server �ṩ�� event api ���WorkflowEventBinding ���� Workflow
  • 4��Webhook���÷�ʽʵ���� Event ��ʽ����չ��Event ��ʽ����ʱ��Ҫ Token ��֤��Webhook ��ʽ��ͨ�� argo-workflows-webhook-clients ���úò�ͬ��Դ�� Webhook ʹ�õ� Secret ��ʵ����֤�������Ϳ��԰� Event API ���� Webhook �˵� ���õ� Github��Gitlab �Ȼ����ˡ�
С���Ƽ��Ķ�

�������������Ľ�Ϊ������Ϣ����������������ͬ���޹۵��֤ʵ��������

a 1.0
a 1.0
���ͣ���������������Ӫ״̬����ʽ��Ӫ�������ԣ����� ����

��Ϸ����

��Ϸ���

��Ϸ��Ƶ

��Ϸ����

��Ϸ�

��alittletotheleft������������һ��ܻ�ӭ����������������Ϸ����ҵ������Ƕ��ճ������еĸ���������
Go v1.62
Go v1.62
���ͣ�����ð��������Ӫ״̬����ʽ��Ӫ�������ԣ����� ����

��Ϸ����

��Ϸ���

��Ϸ��Ƶ

��Ϸ����

��Ϸ�

GoEscape��һ���Թ��������д�����Ϸ���������Ϸ�У���ҿ�����ս�����ؿ���ͨ����ת��Ļ�ķ�ʽ��������

�����Ƶ����

����

ͬ������

����

ɨ��ά�����������ֻ��汾��

ɨ��ά����������΢�Ź��ںţ�

��վ�������������������ϴ��������ַ���İ�Ȩ���뷢�ʼ�[email protected]

��ICP��2022002427��-10 �湫��������43070202000427��© 2013~2025 haote.com ������