Skip to content

Commit 22bae12

Browse files
authored
Restore job acquisition flow (#90)
1 parent a0708d5 commit 22bae12

File tree

6 files changed

+401
-0
lines changed

6 files changed

+401
-0
lines changed

client.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,14 @@ func parseRunnerScaleSetMessageResponse(respBody io.Reader) (*RunnerScaleSetMess
510510
}
511511

512512
switch messageType.MessageType {
513+
case MessageTypeJobAvailable:
514+
var jobAvailable JobAvailable
515+
if err := json.Unmarshal(msg, &jobAvailable); err != nil {
516+
return nil, fmt.Errorf("failed to decode job available: %w", err)
517+
}
518+
519+
message.JobAvailableMessages = append(message.JobAvailableMessages, &jobAvailable)
520+
513521
case MessageTypeJobAssigned:
514522
var jobAssigned JobAssigned
515523
if err := json.Unmarshal(msg, &jobAssigned); err != nil {

listener/listener.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (c *Config) Validate() error {
4949
type Client interface {
5050
GetMessage(ctx context.Context, lastMessageID, maxCapacity int) (*scaleset.RunnerScaleSetMessage, error)
5151
DeleteMessage(ctx context.Context, messageID int) error
52+
AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error)
5253
Session() scaleset.RunnerScaleSetSession
5354
}
5455

@@ -210,6 +211,12 @@ func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scale
210211
return fmt.Errorf("failed to delete message: %w", err)
211212
}
212213

214+
if len(msg.JobAvailableMessages) > 0 {
215+
if err := l.acquireAvailableJobs(ctx, msg.JobAvailableMessages); err != nil {
216+
return fmt.Errorf("failed to acquire available jobs: %w", err)
217+
}
218+
}
219+
213220
for _, jobStarted := range msg.JobStartedMessages {
214221
l.metricsRecorder.RecordJobStarted(jobStarted)
215222
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
@@ -232,6 +239,23 @@ func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scale
232239
return nil
233240
}
234241

242+
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*scaleset.JobAvailable) error {
243+
ids := make([]int64, 0, len(jobsAvailable))
244+
for _, job := range jobsAvailable {
245+
ids = append(ids, job.RunnerRequestID)
246+
}
247+
248+
l.logger.Info("Acquiring jobs", slog.Int("count", len(ids)))
249+
250+
acquired, err := l.client.AcquireJobs(ctx, ids)
251+
if err != nil {
252+
return fmt.Errorf("acquiring jobs: %w", err)
253+
}
254+
255+
l.logger.Info("Jobs acquired", slog.Int("count", len(acquired)))
256+
return nil
257+
}
258+
235259
func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) {
236260
l.latestStatistics = msg
237261
l.metricsRecorder.RecordStatistics(msg)

listener/mocks_test.go

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

session_client.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,67 @@ func (c *MessageSessionClient) Session() RunnerScaleSetSession {
234234
return *c.session
235235
}
236236

237+
// AcquireJobs acquires the given job request IDs from the runner scale set.
238+
// If the current session token is expired, it refreshes the session and tries one more time.
239+
func (c *MessageSessionClient) AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
240+
c.mu.Lock()
241+
defer c.mu.Unlock()
242+
243+
ids, err := c.acquireJobs(ctx, requestIDs)
244+
if err == nil {
245+
return ids, nil
246+
}
247+
248+
if !errors.Is(err, MessageQueueTokenExpiredError) {
249+
return nil, fmt.Errorf("failed to acquire jobs: %w", err)
250+
}
251+
252+
if err := c.refreshMessageSession(ctx); err != nil {
253+
return nil, fmt.Errorf("failed to refresh message session: %w", err)
254+
}
255+
256+
return c.acquireJobs(ctx, requestIDs)
257+
}
258+
259+
func (c *MessageSessionClient) acquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
260+
body, err := json.Marshal(requestIDs)
261+
if err != nil {
262+
return nil, fmt.Errorf("failed to marshal request ids: %w", err)
263+
}
264+
265+
path := fmt.Sprintf("/%s/%d/acquirejobs", scaleSetEndpoint, c.scaleSetID)
266+
267+
c.innerClient.mu.Lock()
268+
req, err := c.innerClient.newActionsServiceRequest(ctx, http.MethodPost, path, bytes.NewBuffer(body))
269+
c.innerClient.mu.Unlock()
270+
if err != nil {
271+
return nil, fmt.Errorf("failed to create acquire jobs request: %w", err)
272+
}
273+
274+
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.session.MessageQueueAccessToken))
275+
276+
resp, err := c.commonClient.do(req)
277+
if err != nil {
278+
return nil, fmt.Errorf("failed to issue acquire jobs request: %w", err)
279+
}
280+
defer resp.Body.Close()
281+
282+
if resp.StatusCode == http.StatusUnauthorized {
283+
return nil, newRequestResponseError(req, resp, MessageQueueTokenExpiredError)
284+
}
285+
286+
if resp.StatusCode != http.StatusOK {
287+
return nil, newRequestResponseError(req, resp, fmt.Errorf("unexpected status code %s", resp.Status))
288+
}
289+
290+
var result acquireJobsResponse
291+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
292+
return nil, newRequestResponseError(req, resp, fmt.Errorf("failed to decode acquire jobs response: %w", err))
293+
}
294+
295+
return result.Value, nil
296+
}
297+
237298
func (c *MessageSessionClient) doSessionRequest(ctx context.Context, method, path string, requestData io.Reader, expectedResponseStatusCode int, responseUnmarshalTarget any) error {
238299
c.innerClient.mu.Lock()
239300
defer c.innerClient.mu.Unlock()

0 commit comments

Comments
 (0)