From 1a82d6da44b14b988a9046ed2d8d0eb014275220 Mon Sep 17 00:00:00 2001 From: kolaente Date: Mon, 28 Aug 2023 13:26:40 +0200 Subject: [PATCH] feat(tasks): add periodic resync of updated tasks to Typesense --- pkg/initialize/init.go | 1 + pkg/migration/20230828125443.go | 46 +++++++++++ pkg/models/typesense.go | 135 ++++++++++++++++++++++++++++++-- 3 files changed, 175 insertions(+), 7 deletions(-) create mode 100644 pkg/migration/20230828125443.go diff --git a/pkg/initialize/init.go b/pkg/initialize/init.go index aa1cd69306b..37211c0970f 100644 --- a/pkg/initialize/init.go +++ b/pkg/initialize/init.go @@ -89,6 +89,7 @@ func FullInit() { models.RegisterUserDeletionCron() models.RegisterOldExportCleanupCron() openid.CleanupSavedOpenIDProviders() + models.RegisterPeriodicTypesenseResyncCron() // Start processing events go func() { diff --git a/pkg/migration/20230828125443.go b/pkg/migration/20230828125443.go new file mode 100644 index 00000000000..eed1c9edcd2 --- /dev/null +++ b/pkg/migration/20230828125443.go @@ -0,0 +1,46 @@ +// Vikunja is a to-do list application to facilitate your life. +// Copyright 2018-2021 Vikunja and contributors. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public Licensee as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public Licensee for more details. +// +// You should have received a copy of the GNU Affero General Public Licensee +// along with this program. If not, see . + +package migration + +import ( + "src.techknowlogick.com/xormigrate" + "time" + "xorm.io/xorm" +) + +type typesenseSync20230828125443 struct { + Collection string `xorm:"not null"` + SyncStartedAt time.Time `xorm:"not null"` + SyncFinishedAt time.Time `xorm:"null"` +} + +func (typesenseSync20230828125443) TableName() string { + return "typesense_sync" +} + +func init() { + migrations = append(migrations, &xormigrate.Migration{ + ID: "20230828125443", + Description: "", + Migrate: func(tx *xorm.Engine) error { + return tx.Sync2(typesenseSync20230828125443{}) + }, + Rollback: func(tx *xorm.Engine) error { + return nil + }, + }) +} diff --git a/pkg/models/typesense.go b/pkg/models/typesense.go index 04d8742eb30..af72072fe24 100644 --- a/pkg/models/typesense.go +++ b/pkg/models/typesense.go @@ -17,16 +17,27 @@ package models import ( + "code.vikunja.io/api/pkg/cron" + "code.vikunja.io/api/pkg/log" + "fmt" + "time" + "xorm.io/xorm" + "code.vikunja.io/api/pkg/config" "code.vikunja.io/api/pkg/db" "code.vikunja.io/api/pkg/user" - "fmt" "github.com/typesense/typesense-go/typesense" "github.com/typesense/typesense-go/typesense/api" "github.com/typesense/typesense-go/typesense/api/pointer" ) +type TypesenseSync struct { + Collection string `xorm:"not null"` + SyncStartedAt time.Time `xorm:"not null"` + SyncFinishedAt time.Time `xorm:"null"` +} + var typesenseClient *typesense.Client func InitTypesense() { @@ -190,16 +201,39 @@ func ReindexAllTasks() (err error) { s := db.NewSession() defer s.Close() + currentSync := &TypesenseSync{ + Collection: "tasks", + SyncStartedAt: time.Now(), + } + _, err = s.Insert(currentSync) + if err != nil { + return err + } + err = s.Find(tasks) if err != nil { return err } + err = reindexTasks(s, tasks) + if err != nil { + return err + } + + currentSync.SyncFinishedAt = time.Now() + _, err = s.Where("collection = ?", "tasks"). + Cols("sync_finished_at"). + Update(currentSync) + return +} + +func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) { err = addMoreInfoToTasks(s, tasks, &user.User{ID: 1}) if err != nil { return err } + typesenseTasks := []interface{}{} for _, task := range tasks { searchTask := convertTaskToTypesenseTask(task) @@ -209,12 +243,18 @@ func ReindexAllTasks() (err error) { return err } - _, err = typesenseClient.Collection("tasks"). - Documents(). - Create(searchTask) - if err != nil { - return err - } + typesenseTasks = append(typesenseTasks, searchTask) + + } + + _, err = typesenseClient.Collection("tasks"). + Documents(). + Import(typesenseTasks, &api.ImportDocumentsParams{ + Action: pointer.String("upsert"), + BatchSize: pointer.Int(100), + }) + if err != nil { + return err } return nil @@ -301,3 +341,84 @@ func convertTaskToTypesenseTask(task *Task) *typesenseTask { return tt } + +func SyncUpdatedTasksIntoTypesense() (err error) { + tasks := make(map[int64]*Task) + + s := db.NewSession() + _ = s.Begin() + defer s.Close() + + lastSync := &TypesenseSync{} + has, err := s.Where("collection = ?", "tasks"). + Get(lastSync) + if err != nil { + _ = s.Rollback() + return err + } + + if !has { + log.Errorf("[Typesense Sync] No typesense sync stats yet, please run a full index via the CLI first") + _ = s.Rollback() + return + } + + currentSync := &TypesenseSync{SyncStartedAt: time.Now()} + _, err = s.Where("collection = ?", "tasks"). + Cols("sync_started_at", "sync_finished_at"). + Update(currentSync) + if err != nil { + _ = s.Rollback() + return + } + + err = s. + Where("updated >= ?", lastSync.SyncStartedAt). + Find(tasks) + if err != nil { + _ = s.Rollback() + return + } + + if len(tasks) > 0 { + log.Debugf("[Typesense Sync] Updating %d tasks", len(tasks)) + + err = reindexTasks(s, tasks) + if err != nil { + _ = s.Rollback() + return + } + } + + if len(tasks) == 0 { + log.Debugf("[Typesense Sync] No tasks changed since the last sync, not syncing") + } + + currentSync.SyncFinishedAt = time.Now() + _, err = s.Where("collection = ?", "tasks"). + Cols("sync_finished_at"). + Update(currentSync) + if err != nil { + _ = s.Rollback() + return err + } + + return s.Commit() +} + +func RegisterPeriodicTypesenseResyncCron() { + if !config.TypesenseEnabled.GetBool() { + log.Debugf("[Typesense Sync] Typesense is disabled, not setting up sync cron") + return + } + + err := cron.Schedule("* * * * *", func() { + err := SyncUpdatedTasksIntoTypesense() + if err != nil { + log.Fatalf("[Typesense Sync] Could not sync updated tasks into typesense: %s", err) + } + }) + if err != nil { + log.Fatalf("[Typesense Sync] Could not register typesense resync cron: %s", err) + } +}