Skip to content

Commit 9a44be8

Browse files
committed
Adding first Event Bridge Sink Control Plane work.
Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
1 parent 427c91a commit 9a44be8

File tree

11 files changed

+202
-37
lines changed

11 files changed

+202
-37
lines changed

config/core/deployments/controller.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ spec:
135135
key: aws-sns-sink
136136
name: eventing-integrations-images
137137

138+
- name: INTEGRATION_SINK_AWS_EVENTBRIDGE_IMAGE
139+
valueFrom:
140+
configMapKeyRef:
141+
key: aws-eventbridge-sink
142+
name: eventing-integrations-images
143+
144+
138145
## Adapter settings
139146
# - name: K_LOGGING_CONFIG
140147
# value: ''

config/core/resources/integrationsink.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,46 @@ spec:
111111
aws:
112112
type: object
113113
properties:
114+
eventbridge:
115+
type: object
116+
properties:
117+
arn:
118+
type: string
119+
title: Eventbus Name
120+
description: The Eventbridge Eventbus name or Amazon Resource Name (ARN).
121+
resourcesArn:
122+
type: string
123+
title: Resources ARN
124+
description: The ARN of resources related to the AWS event (e.g. arn:aws:s3:eu-east-1:000000000001:test).
125+
eventSourcePrefix:
126+
type: string
127+
title: Event Source Prefix
128+
description: The event source prefix set for all events sent to the eventbus.
129+
eventSource:
130+
type: string
131+
title: Event Source
132+
description: The event source related to the AWS event (e.g. aws.s3).
133+
detailType:
134+
type: string
135+
title: Detail Type
136+
description: The event detail type related to the AWS event (e.g. Object Created).
137+
region:
138+
type: string
139+
title: AWS Region
140+
description: The AWS region to access.
141+
uriEndpointOverride:
142+
type: string
143+
title: Overwrite Endpoint URI
144+
description: The overriding endpoint URI. To use this option, you must
145+
also select the `overrideEndpoint` option.
146+
overrideEndpoint:
147+
type: boolean
148+
title: Endpoint Overwrite
149+
description: Select this option to override the endpoint URI. To use
150+
this option, you must also provide a URI for the `uriEndpointOverride`
151+
option.
152+
default: false
153+
114154
s3:
115155
type: object
116156
properties:

docs/eventing-api.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6759,12 +6759,23 @@ JobSinkStatus
67596759
<tbody>
67606760
<tr>
67616761
<td>
6762+
<code>eventbridge</code><br/>
6763+
<em>
6764+
knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSEventbridge
6765+
</em>
6766+
</td>
6767+
<td>
6768+
</td>
6769+
</tr>
6770+
<tr>
6771+
<td>
67626772
<code>s3</code><br/>
67636773
<em>
67646774
knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSS3
67656775
</em>
67666776
</td>
67676777
<td>
6778+
<p>EB sink configuration</p>
67686779
</td>
67696780
</tr>
67706781
<tr>

pkg/apis/common/integration/v1alpha1/aws.go

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,45 +32,54 @@ type AWSCommon struct {
3232
}
3333

3434
type AWSS3 struct {
35-
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
36-
Arn string `json:"arn,omitempty" camel:"BUCKET_NAME_OR_ARN"` // S3 ARN
37-
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
38-
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
39-
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
40-
DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects
41-
DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects
42-
AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket
43-
Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search
44-
IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body
45-
ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access
46-
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
47-
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request
35+
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
36+
Arn string `json:"arn,omitempty" camel:"BUCKET_NAME_OR_ARN"` // S3 ARN
37+
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
38+
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
39+
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
40+
DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects
41+
DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects
42+
AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket
43+
Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search
44+
IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body
45+
ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access
46+
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
47+
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request
4848
}
4949

5050
type AWSSQS struct {
51-
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
52-
Arn string `json:"arn,omitempty" camel:"QUEUE_NAME_OR_ARN"` // SQS ARN
53-
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
54-
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
55-
Host string `json:"host" camel:"AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
56-
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
57-
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
58-
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
59-
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
60-
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10)
61-
WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages
62-
VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds
51+
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
52+
Arn string `json:"arn,omitempty" camel:"QUEUE_NAME_OR_ARN"` // SQS ARN
53+
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
54+
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
55+
Host string `json:"host" camel:"AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
56+
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
57+
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
58+
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
59+
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
60+
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10)
61+
WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages
62+
VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds
6363
}
6464

6565
type AWSDDBStreams struct {
66-
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
67-
Table string `json:"table,omitempty"` // The name of the DynamoDB table
68-
StreamIteratorType string `json:"streamIteratorType,omitempty" default:"FROM_LATEST"` // Defines where in the DynamoDB stream to start getting records
69-
Delay int `json:"delay,omitempty" default:"500"` // Delay in milliseconds before the next poll from the database
66+
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
67+
Table string `json:"table,omitempty"` // The name of the DynamoDB table
68+
StreamIteratorType string `json:"streamIteratorType,omitempty" default:"FROM_LATEST"` // Defines where in the DynamoDB stream to start getting records
69+
Delay int `json:"delay,omitempty" default:"500"` // Delay in milliseconds before the next poll from the database
7070
}
7171

7272
type AWSSNS struct {
73-
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
74-
Arn string `json:"arn,omitempty" camel:"TOPIC_NAME_OR_ARN"` // SNS ARN
75-
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
73+
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
74+
Arn string `json:"arn,omitempty" camel:"TOPIC_NAME_OR_ARN"` // SNS ARN
75+
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
76+
}
77+
78+
type AWSEventbridge struct {
79+
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
80+
Arn string `json:"arn,omitempty" camel:"EVENTBUS_NAME_OR_ARN"` // EB ARN
81+
ResourcesArn string `json:"resourcesArn,omitempty"` // EB Resources ARN
82+
EventSourcePrefix string `json:"eventSourcePrefix,omitempty"` // EB Event Source Prefix
83+
EventSource string `json:"eventSource,omitempty"` // EB Event Source
84+
DetailType string `json:"detailType,omitempty"` // EB Detail Type
7685
}

pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/sinks/v1alpha1/integration_sink_types.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,11 @@ type Log struct {
7474
}
7575

7676
type Aws struct {
77-
S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 sink configuration
78-
SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS sink configuration
79-
SNS *v1alpha1.AWSSNS `json:"sns,omitempty"` // SNS sink configuration
80-
Auth *v1alpha1.Auth `json:"auth,omitempty"`
77+
EVENTBRIDGE *v1alpha1.AWSEventbridge `json:"eventbridge,omitempty"` // EB sink configuration
78+
S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 sink configuration
79+
SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS sink configuration
80+
SNS *v1alpha1.AWSSNS `json:"sns,omitempty"` // SNS sink configuration
81+
Auth *v1alpha1.Auth `json:"auth,omitempty"`
8182
}
8283

8384
type IntegrationSinkStatus struct {

pkg/apis/sinks/v1alpha1/integration_sink_types_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ func TestAWS(t *testing.T) {
9898
if sns.Region != "eu-north-1" {
9999
t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", sns.Region)
100100
}
101+
102+
eb := v1alpha1.AWSEventbridge{
103+
AWSCommon: v1alpha1.AWSCommon{
104+
Region: "eu-north-1",
105+
},
106+
Arn: "example-event-bus",
107+
}
108+
109+
if eb.Region != "eu-north-1" {
110+
t.Errorf("AWSEventbridge.Region = %v, want 'eu-north-1'", sns.Region)
111+
}
101112
}
102113

103114
// TestAuthFieldAccess tests the HasAuth method and field access in Auth struct

pkg/apis/sinks/v1alpha1/integration_sink_validation.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
4545
if spec.Aws.SNS != nil {
4646
sinkSetCount++
4747
}
48+
if spec.Aws.EVENTBRIDGE != nil {
49+
sinkSetCount++
50+
}
4851
}
4952

5053
// Validate that only one sink field is set
@@ -56,7 +59,7 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
5659

5760
// Only perform AWS-specific validation if exactly one AWS sink is configured
5861
if sinkSetCount == 1 && spec.Aws != nil {
59-
if spec.Aws.S3 != nil || spec.Aws.SQS != nil || spec.Aws.SNS != nil {
62+
if spec.Aws.S3 != nil || spec.Aws.SQS != nil || spec.Aws.SNS != nil || spec.Aws.EVENTBRIDGE != nil {
6063
// Check that AWS Auth is properly configured
6164
if !spec.Aws.Auth.HasAuth() {
6265
errs = errs.Also(apis.ErrMissingField("aws.auth.secret.ref.name"))
@@ -91,6 +94,15 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
9194
errs = errs.Also(apis.ErrMissingField("aws.sns.region"))
9295
}
9396
}
97+
// Additional validation for AWS Eventbridge required fields
98+
if spec.Aws.EVENTBRIDGE != nil {
99+
if spec.Aws.EVENTBRIDGE.Arn == "" {
100+
errs = errs.Also(apis.ErrMissingField("aws.eventbridge.arn"))
101+
}
102+
if spec.Aws.EVENTBRIDGE.Region == "" {
103+
errs = errs.Also(apis.ErrMissingField("aws.eventbridge.region"))
104+
}
105+
}
94106
}
95107

96108
return errs

pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,26 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
203203
},
204204
want: apis.ErrMissingField("aws.sns.arn"),
205205
},
206+
{
207+
name: "AWS Eventbridge sink without TopicNameOrArn (invalid)",
208+
spec: IntegrationSinkSpec{
209+
Aws: &Aws{
210+
EVENTBRIDGE: &v1alpha1.AWSEventbridge{
211+
AWSCommon: v1alpha1.AWSCommon{
212+
Region: "us-east-1",
213+
},
214+
},
215+
Auth: &v1alpha1.Auth{
216+
Secret: &v1alpha1.Secret{
217+
Ref: &v1alpha1.SecretReference{
218+
Name: "aws-secret",
219+
},
220+
},
221+
},
222+
},
223+
},
224+
want: apis.ErrMissingField("aws.eventbridge.arn"),
225+
},
206226
{
207227
name: "no sink type specified (invalid)",
208228
spec: IntegrationSinkSpec{},
@@ -255,6 +275,24 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
255275
},
256276
want: apis.ErrMissingField("aws.s3.region"),
257277
},
278+
{
279+
name: "AWS Eventbridge sink without region (invalid)",
280+
spec: IntegrationSinkSpec{
281+
Aws: &Aws{
282+
EVENTBRIDGE: &v1alpha1.AWSEventbridge{
283+
Arn: "example-bus",
284+
},
285+
Auth: &v1alpha1.Auth{
286+
Secret: &v1alpha1.Secret{
287+
Ref: &v1alpha1.SecretReference{
288+
Name: "aws-secret",
289+
},
290+
},
291+
},
292+
},
293+
},
294+
want: apis.ErrMissingField("aws.eventbridge.region"),
295+
},
258296
}
259297

260298
for _, test := range tests {

pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)