Базовый рабочий механизм представляет собой следующее: запланированное событие добавляется в базу данных, из которой оно будет выполняться в определенное время. Другая задача будет запускаться регулярно, чтобы проверить, не истекло ли какое-либо событие в базе данных. Если да, то запустится событие polling.

Детали реализации

Начнем с создания схемы базы данных в postgresql, которая будет использоваться для хранения событий:

CREATE TABLE IF NOT EXISTS "public"."jobs" (   
   "id"      SERIAL PRIMARY KEY,   
   "name"    varchar(50) NOT NULL,   
   "payload" text,   
   "runAt"   TIMESTAMP NOT NULL  
)

Теперь определим структуру данных для следующих элементов.

  • Event  —  запланированная задача.
  • Listeners  —  список слушателей событий.
  • ListenFunc  —  функция, которая будет вызываться при запуске события.
// Listeners прикрепляет слушателей событий
type Listeners map[string]ListenFunc

// Функция ListenFunc, которая прослушивает события
type ListenFunc func(string)

// Структура события
type Event struct {
	ID      uint
	Name    string
	Payload string
}

Также определим структуру Scheduler, которая будет планировать события и запускать слушателей:

// Структура данных планировщика
type Scheduler struct {
	db        *sql.DB
	listeners Listeners
}

// NewScheduler создает новый планировщик
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
	return Scheduler{
		db:        db,
		listeners: listeners,
	}
}

Здесь мы создаем новый планировщик, передавая ему экземпляр sql.DB и начальных слушателей.

Теперь нужно добавить реализацию функции планирования, которая будет помещать событие в таблицу jobs:

// Schedule планирует предоставленные события
func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
	log.Print("🚀 Scheduling event ", event, " to run at ", runAt)
	_, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt") VALUES ($1, $2, $3)`, event, payload, runAt)
	if err != nil {
		log.Print("schedule insert error: ", err)
	}
}

// AddListener добавляет функцию listener в Listeners
func (s Scheduler) AddListener(event string, listenFunc ListenFunc) {
	s.listeners[event] = listenFunc
}

В функции AddListener мы просто присваиваем функцию listener к имени события.

Мы завершили первый этап: добавили событие в таблицу job. Теперь нужно извлечь устаревшие задачи из базы данных, выполнить их, а затем удалить.

Реализация функции ниже показывает, как можно выявить устаревшие события в таблице, а также сериализацию события в структуру Event:

// checkDueEvents проверяет и возвращает соответствующие события
func (s Scheduler) checkDueEvents() []Event {
	events := []Event{}
	rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
	if err != nil {
		log.Print("💀 error: ", err)
		return nil
	}
	for rows.Next() {
		evt := Event{}
		rows.Scan(&evt.ID, &evt.Name, &evt.Payload)
		events = append(events, evt)
	}
	return events
}

Переходим ко второму этапу: вызываем зарегистрированных слушателей событий из базы данных:

// callListeners вызывает слушателя определенного события
func (s Scheduler) callListeners(event Event) {
	eventFn, ok := s.listeners[event.Name]
	if ok {
		go eventFn(event.Payload)
		_, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
		if err != nil {
			log.Print("💀 error: ", err)
		}
	} else {
		log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
	}

}

Здесь мы проверяем наличие функции event. Если она прикреплена, то мы вызываем функцию event listener. Строки 6–9 удаляют задачу, поэтому при повторном поиске по базе данных слушатель не будет найден.

И наконец, переходим к финальному этапу: проверяем, истекло ли какое-либо событие в заданный интервал времени. Для запуска задач в определенный период используем функцию ticker библиотеки time, которая предоставит канал, получающий новый тик в заданном интервале.

// CheckEventsInInterval проверяет события в заданном интервале
func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) {
	ticker := time.NewTicker(duration)
	go func() {
		for {
			select {
			case <-ctx.Done():
				ticker.Stop()
				return
			case <-ticker.C:
				log.Println("⏰ Ticks Received...")
				events := s.checkDueEvents()
				for _, e := range events {
					s.callListeners(e)
				}
			}

		}
	}()
}

Здесь мы проверяем, закрыт ли контекст, или же канал ticker получает тики. После получения тиков просматриваем соответствующие события, а затем вызываем слушателей для всех событий.

Теперь мы будем использовать все функции, определенные ранее в файле main.go:

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/dipeshdulal/event-scheduling/customevents"
)

var eventListeners = Listeners{
	"SendEmail": customevents.SendEmail,
	"PayBills":  customevents.PayBills,
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)

	db := initDBConnection()

	scheduler := NewScheduler(db, eventListeners)
	scheduler.CheckEventsInInterval(ctx, time.Minute)

	scheduler.Schedule("SendEmail", "mail: [email protected]", time.Now().Add(1*time.Minute))
	scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))

	go func() {
		for range interrupt {
			log.Println("\n❌ Interrupt received closing...")
			cancel()
		}
	}()

	<-ctx.Done()
}

В строках 13–16 мы прикрепляем слушателей к имени событий SendEmail и PayBills, чтобы эти функции вызывались при появлении новой задачи.

В строках 22 и 32–37 мы прикрепляем канал прерывания (interrupt) с помощью os.Interrupt. Когда прерывание в программе выполняется, мы отменяем контекст в строке 19.

В строках 26–30 мы определяем планировщик событий, запускаем функцию polling и планируем выполнение события SendEmail через минуту, а PayBills  —  через две минуты.

Вывод данной программы выглядит следующим образом:

2021/01/16 11:58:49 💾 Seeding database with table...
2021/01/16 11:58:49 🚀 Scheduling event SendEmail to run at 2021-01-16 11:59:49.344904505 +0545 +0545 m=+60.004623549
2021/01/16 11:58:49 🚀 Scheduling event PayBills to run at 2021-01-16 12:00:49.34773798 +0545 +0545 m=+120.007457039
2021/01/16 11:59:49 ⏰ Ticks Received...
2021/01/16 11:59:49 📨 Sending email with data:  mail: [email protected]
2021/01/16 12:00:49 ⏰ Ticks Received...
2021/01/16 12:01:49 ⏰ Ticks Received...
2021/01/16 12:01:49 💲 Pay me a bill:  paybills: $4,000 bill
2021/01/16 12:02:49 ⏰ Ticks Received...
2021/01/16 12:03:49 ⏰ Ticks Received...
^C2021/01/16 12:03:57 
❌ Interrupt received closing...

Мы видим, что событие SendEmail было выполнено через минуту, а PayBills  —  в следующую минуту.

Таким образом, мы создали базовую систему планирования событий, которая выполняет задачи в определенный временной интервал. Здесь можно найти полный пример кода.

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи Dipesh Dulal: Building Basic Event Scheduler in Go

Предыдущая статьяКак создать адаптивную галерею для интернет-магазина
Следующая статья4 типа архитектуры программного обеспечения