����λ�ã���ҳ > �����̳� > �̳� > ArgoWorkflow�̳�(��)---Workflow �Ķ��ִ���ģʽ���ֶ�����ʱ�������¼�����
��һƪ���Ƿ�����argo-workflow �е� archive������ ��ˮ��GC����ˮ�߹鵵����־�鵵�ȹ��ܡ���ƪ��Ҫ���� Workflow �еļ��ִ�����ʽ�������ֶ���������ʱ������Event �¼������ȡ� 1. ���� Argo Workflows ����ˮ���ж��ִ�����ʽ�� �ֶ��������ֶ��ύһ
��һƪ���Ƿ�����argo-workflow �е� archive������ ��ˮ��GC����ˮ�߹鵵����־�鵵�ȹ��ܡ���ƪ��Ҫ���� Workflow �еļ��ִ�����ʽ�������ֶ���������ʱ������Event �¼������ȡ�
Argo Workflows ����ˮ���ж��ִ�����ʽ��
CronWorkflow
�����Ͼ���һ�� Workflow + Cron Spec��
����ϲο��� k8s �е� CronJob
һ���򵥵� CronWorkflow ���£�
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 90"]
apply һ�£����Կ������������� Workflow ����Ϊ
$cronWorkflowName-xxx
[root@lixd-argo workdir]# k get cwf
NAME AGE
test-cron-wf 116s
[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
test-cron-wf-1711852560 Running 47s
���� template ������������
sleep 90s
��ˣ����������ʱ�϶��dz��� 60s �ģ��������õ� concurrencyPolicy Ϊ Replace ����� 60s �󣬵ڶ��� Workflow ��������������һ���ͻᱻֹͣ����
[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
test-cron-wf-1711852560 Failed 103s Stopped with strategy 'Terminate'
test-cron-wf-1711852620 Running 43s
֧�ֵľ���������£�
type CronWorkflowSpec struct {
// WorkflowSpec is the spec of the workflow to be run
WorkflowSpec WorkflowSpec `json:"workflowSpec" protobuf:"bytes,1,opt,name=workflowSpec,casttype=WorkflowSpec"`
// Schedule is a schedule to run the Workflow in Cron format
Schedule string `json:"schedule" protobuf:"bytes,2,opt,name=schedule"`
// ConcurrencyPolicy is the K8s-style concurrency policy that will be used
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
// Suspend is a flag that will stop new CronWorkflows from running if set to true
Suspend bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
// StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its
// original scheduled time if it is missed.
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,5,opt,name=startingDeadlineSeconds"`
// SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
// FailedJobsHistoryLimit is the number of failed jobs to be kept at a time
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
// Timezone is the timezone against which the cron schedule will be calculated, e.g. "Asia/Tokyo". Default is machine's local time.
Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
// WorkflowMetadata contains some metadata of the workflow to be run
WorkflowMetadata *metav1.ObjectMeta `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
}
���ݿ��Է�Ϊ 3 ���֣�
WorkflowSpec �� WorkflowMetadata û̫�����𣬾Ͳ�׸���ˣ�����һ�� Cron Spec ��صļ����ֶΣ�
* * * * *
ÿ���Ӵ���һ��
�󲿷��ֶκ� K8s CronJob һ��
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: my-cron
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 10"]
workflowMetadata:
labels:
from: cron
������ metadata������һ��
[root@lixd-argo workdir]# k get wf my-cron-1711853400 -oyaml|grep labels -A 1
labels:
from: cron
���Կ��������������� Workflow ȷʵЯ�����ˣ��� CronWorkflow ��ָ���� label��
argo �ṩ��һ�� Event API��
/api/v1/events/{namespace}/{discriminator}
,�� API ���Խ������� json ���ݡ�
ͨ�� event API ���Դ��� Workflow �������� Webhook��
��������������
curl https://localhost:2746/api/v1/events/argo/ \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
����������
curl https://localhost:2746/api/v1/events/argo/my-discriminator \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
���� RBAC ��ض���,role��rolebinding��sa������ role ֻ��Ҫ�ṩ��СȨ�޼��ɡ�
ֱ�Ӵ����� default �����ռ�
kubectl apply -f - <
serviceaccount �� rolebinding
kubectl create sa test
kubectl create rolebinding test --role=test --serviceaccount=default:test
Ȼ�󴴽�һ�� Secret
kubectl apply -f - <
���Ϳ��Բ�ѯ Secret ���� Token ��
ARGO_TOKEN="Bearer $(kubectl get secret test.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"
echo $ARGO_TOKEN
Bearer ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNkltS...
���ԣ��ܷ�����ʹ��
ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')
curl http://$ARGO_SERVER:2746/api/v1/workflow-event-bindings/default -H "Authorization: $ARGO_TOKEN"
Ϊ�˽��� Event�����Դ��� WorkflowEventBinding ���󣬾������£�
apiVersion: argoproj.io/v1alpha1
kind: WorkflowEventBinding
metadata:
name: event-consumer
spec:
event:
# metadata header name must be lowercase to match in selector
selector: payload.message != "" && metadata["x-argo-e2e"] == ["true"] && discriminator == "my-discriminator"
submit:
workflowTemplateRef:
name: my-wf-tmple
arguments:
parameters:
- name: message
valueFrom:
event: payload.message
spec.event ָ���˸� Binding �����ƥ���յ��� Event������������������ǣ�
���ƥ�����ʹ�� submit ����ָ�������ݴ��� Workflow��
���ڴ������� Workflow ������ my-wf-tmple ������,�ȴ������ Template
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: my-wf-tmple
spec:
templates:
- name: main
inputs:
parameters:
- name: message
value: "{{workflow.parameters.message}}"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
entrypoint: main
������ǾͿ��Է��� API ������ event ʵ�� Workflow �Ĵ���
curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
-H "Authorization: $ARGO_TOKEN" \
-H "X-Argo-E2E: true" \
-d '{"message": "hello events"}'
����һ�£�
{}[root@lixd-argo workdir]# curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
> -H "Authorization: $ARGO_TOKEN" \
> -H "X-Argo-E2E: true" \
> -d '{"message": "hello events"}'
{}[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
my-wf-tmple-ea81n Running 5s
[root@lixd-argo workdir]# k get wf my-wf-tmple-ea81n -oyaml|grep parameters -A 5
parameters:
- name: message
value: hello events
���Կ�����Workflow �Ѿ����������ˣ����Ҳ���Ҳ�����Ƿ�����ʱ���� hello events��
Ĭ������� argo-server ����ͬʱ���� 64 ���¼����ٶ�ͻ�ֱ�ӷ��� 503 �ˣ�����ͨ�����²������е�����
ǰ�� Event �½��ᵽ�˿���ͨ������ HTTP ����ķ�ʽ���������� event �� Workflow��������Ҫ�ͻ����ṩ AuthToken��
�������ˣ�����һЩ����ָ�� Token �Ŀͻ�����˵�ͱȽ��鷳�ˣ����� Github��Gitlab �� Git �ֿ⣬���������� Webhook�����յ� commit ��ʱ����� Webhook ��������ˮ�ߡ�
��ʱ����Щ���͹���������϶���û�д� Token �ģ������Ҫ����������������֤����֤ argo ֻ�������� Github��Gitlab �ȵ�ƽ̨�� Webhook ����
��һ�� Token �� Event �½�һ�£��Ͳ���׸���ˣ���Ҫ�ǵڶ�����
��һ�������� RBAC ����׼���� Secret ֮��һ��ͻ��˶��ǽ��� Secret �е� Token��Ȼ����ϸ� Token �������󣬾���������
ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')
ARGO_TOKEN="Bearer $(kubectl get secret jenkins.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"
curl https://$ARGO_SERVER:2746/api/v1/events/default/ \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
���ǣ����� Webhook �ͻ�����˵����û�취����ָ�� token �ģ������Ҫͨ��
argo-workflows-webhook-clients
���������� argo,�ĸ� Webhook ʹ���ĸ� Secret �е� token��
����һ����Ϊ
argo-workflows-webhook-clients
�� Secret�����ݴ����������ģ�
kind: Secret
apiVersion: v1
metadata:
name: argo-workflows-webhook-clients
# The data keys must be the name of a service account.
stringData:
# https://support.atlassian.com/bitbucket-cloud/docs/manage-webhooks/
bitbucket.org: |
type: bitbucket
secret: "my-uuid"
# https://confluence.atlassian.com/bitbucketserver/managing-webhooks-in-bitbucket-server-938025878.html
bitbucketserver: |
type: bitbucketserver
secret: "shh!"
# https://developer.github.com/webhooks/securing/
github.com: |
type: github
secret: "shh!"
# https://docs.gitlab.com/ee/user/project/integrations/webhooks.html
gitlab.com: |
type: gitlab
secret: "shh!"
�� Github ���壬secret �������£�
������ Webhook ʱ������һ�� Secret ���ã�ʵ�ʾ���һ�������ַ��������ʲô�����ԡ�
���� Github ���� Webhook ����ʱ�ͻ�Я������� Secret ��Ϣ��Argo �յ���͸���
argo-workflows-webhook-clients
�� Secret �����õ� type=github �� secret �ֶν��жԱȣ����ƥ���Ͼʹ���������ͺ��Ը�����
�����ƥ���ϾʹӶ�Ӧ�� Serviceaccount �н��� Token ��Ϊ Authorization ��Ϣ��
Webhook ��һ�飬�ٷ��ĵ����Ǻ���ϸ��һ�ʴ����ˣ���˷�����Դ�롣
����߼���һ�� Interceptor ����ʽ���֣��������� Event API ���á¾ï¿½ï¿½ï¿½ï¿½ï¿½ß¼ï¿½ï¿½ï¿½ ����Ϊû��Я�� Authorization ���������� Authorization ��Ϣ ��
// Interceptor creates an annotator that verifies webhook signatures and adds the appropriate access token to the request.
func Interceptor(client kubernetes.Interface) func(w http.ResponseWriter, r *http.Request, next http.Handler) {
return func(w http.ResponseWriter, r *http.Request, next http.Handler) {
err := addWebhookAuthorization(r, client)
if err != nil {
log.WithError(err).Error("Failed to process webhook request")
w.WriteHeader(403)
// hide the message from the user, because it could help them attack us
_, _ = w.Write([]byte(`{"message": "failed to process webhook request"}`))
} else {
next.ServeHTTP(w, r)
}
}
}
���� addWebhookAuthorization ����������֤��Ϣ��
func addWebhookAuthorization(r *http.Request, kube kubernetes.Interface) error {
// try and exit quickly before we do anything API calls
if r.Method != "POST" || len(r.Header["Authorization"]) > 0 || !strings.HasPrefix(r.URL.Path, pathPrefix) {
return nil
}
parts := strings.SplitN(strings.TrimPrefix(r.URL.Path, pathPrefix), "/", 2)
if len(parts) != 2 {
return nil
}
namespace := parts[0]
secretsInterface := kube.CoreV1().Secrets(namespace)
ctx := r.Context()
webhookClients, err := secretsInterface.Get(ctx, "argo-workflows-webhook-clients", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get webhook clients: %w", err)
}
// we need to read the request body to check the signature, but we still need it for the GRPC request,
// so read it all now, and then reinstate when we are done
buf, _ := io.ReadAll(r.Body)
defer func() { r.Body = io.NopCloser(bytes.NewBuffer(buf)) }()
serviceAccountInterface := kube.CoreV1().ServiceAccounts(namespace)
for serviceAccountName, data := range webhookClients.Data {
r.Body = io.NopCloser(bytes.NewBuffer(buf))
client := &webhookClient{}
err := yaml.Unmarshal(data, client)
if err != nil {
return fmt.Errorf("failed to unmarshal webhook client \"%s\": %w", serviceAccountName, err)
}
log.WithFields(log.Fields{"serviceAccountName": serviceAccountName, "webhookType": client.Type}).Debug("Attempting to match webhook request")
ok := webhookParsers[client.Type](client.Secret, r)
if ok {
log.WithField("serviceAccountName", serviceAccountName).Debug("Matched webhook request")
serviceAccount, err := serviceAccountInterface.Get(ctx, serviceAccountName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get service account \"%s\": %w", serviceAccountName, err)
}
tokenSecret, err := secretsInterface.Get(ctx, secrets.TokenNameForServiceAccount(serviceAccount), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get token secret \"%s\": %w", tokenSecret, err)
}
r.Header["Authorization"] = []string{"Bearer " + string(tokenSecret.Data["token"])}
return nil
}
}
return nil
}
�����������£�
��������ֱ��ʹ�� key ��Ϊ serviceaccount����Ҳ����Ϊʲô����
argo-workflows-webhook-clients
ʱ��Ҫ�� serviceaccount ������Ϊ key��
��ArgoWorkflow ϵ�С� ���������У��������ںš� ̽����Ô�� �����ģ��Ķ��������¡�
������Ҫ������ Argo �е� Workflow �ļ��ִ�����ʽ��
argo-workflows-webhook-clients
���úò�ͬ��Դ�� Webhook ʹ�õ� Secret ��ʵ����֤�������Ϳ��԰� Event API ���� Webhook �˵� ���õ� Github��Gitlab �Ȼ����ˡ�
ʹ��Blender���ɳ���ģ��
�Ķ�ȫ����������ERA5�����ط���
�Ķ�Xpath���������﷨
�Ķ�����ѧϰ�������繹�����£�
�Ķ���ΪMateƷ��ʢ�䣺HarmonyOS NEXT�ӳ�����Ϸ���ܵõ�����ͷ�
�Ķ�ʵ�ֶ��󼯺���DataTable���໥ת��
�Ķ�Ӳ�̵Ļ���֪ʶ��ѡ��ָ��
�Ķ�������й��ƶ��ı�ͼ��ײ�
�Ķ�����NEXTԪ�����������ѿ����ϼ���Ʒ
�Ķ��ᳲ���С������������Ƽ��رշ���
�Ķ������ArcMap�����н���դ��ͼ���ز�������
�Ķ��㷨�����ݽṹ 1 - ģ��
�Ķ���Ѷ�����߿ͷ���Ӫ��ϵͳ����
�Ķ���Ѷ��Ƶҹ��ģʽ���ý̳�
�Ķ����ں���NEXT��Ѫ���Ŵ���������������
�Ķ�5. Spring Cloud OpenFeign ����ʽ WebService �ͻ��˵ij���ϸʹ��
�Ķ�Java����ģʽ����̬�����Ͷ�̬�����ĶԱȷ���
�Ķ�Win11�ʼDZ����Զ�����Ӧ�õ���ɫ����ʾ����
�Ķ�˼�� V1.5.6 ��׿��
��ս�귨 V7.5.0 ��׿��
У��������������׵������� V1.0 ��׿��
��˸֮�� V1.9.7 ��׿��
������Ե����� v1.0.4 ��׿��
������֮ŠV5.2.3 ��׿��
��������������Դ V1.0 ��׿��
���֮Ϣ V1.0 ��׿��
��ħ������������䣩 V1.0 ��׿��
���ں�������ϵ�����������������վ�����������������Ƽ�����
Ƶ�� ����Ƶ��������ר������������׿�������app����
�Ƽ� ��Ô���������°��������ܿ������ز���
���� ����ɫ������������ ���������ս������������
ɨ��ά�����������ֻ��汾��
ɨ��ά����������΢�Ź��ںţ�
��վ�������������������ϴ��������ַ���İ�Ȩ���뷢�ʼ�[email protected]
��ICP��2022002427��-10 �湫��������43070202000427��© 2013~2025 haote.com ������