feat(tasks): add periodic resync of updated tasks to Typesense

This commit is contained in:
kolaente 2023-08-28 13:26:40 +02:00
parent 010b4ce783
commit 1a82d6da44
Signed by untrusted user: konrad
GPG Key ID: F40E70337AB24C9B
3 changed files with 175 additions and 7 deletions

View File

@ -89,6 +89,7 @@ func FullInit() {
models.RegisterUserDeletionCron()
models.RegisterOldExportCleanupCron()
openid.CleanupSavedOpenIDProviders()
models.RegisterPeriodicTypesenseResyncCron()
// Start processing events
go func() {

View File

@ -0,0 +1,46 @@
// Vikunja is a to-do list application to facilitate your life.
// Copyright 2018-2021 Vikunja and contributors. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public Licensee as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public Licensee for more details.
//
// You should have received a copy of the GNU Affero General Public Licensee
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package migration
import (
"src.techknowlogick.com/xormigrate"
"time"
"xorm.io/xorm"
)
type typesenseSync20230828125443 struct {
Collection string `xorm:"not null"`
SyncStartedAt time.Time `xorm:"not null"`
SyncFinishedAt time.Time `xorm:"null"`
}
func (typesenseSync20230828125443) TableName() string {
return "typesense_sync"
}
func init() {
migrations = append(migrations, &xormigrate.Migration{
ID: "20230828125443",
Description: "",
Migrate: func(tx *xorm.Engine) error {
return tx.Sync2(typesenseSync20230828125443{})
},
Rollback: func(tx *xorm.Engine) error {
return nil
},
})
}

View File

@ -17,16 +17,27 @@
package models
import (
"code.vikunja.io/api/pkg/cron"
"code.vikunja.io/api/pkg/log"
"fmt"
"time"
"xorm.io/xorm"
"code.vikunja.io/api/pkg/config"
"code.vikunja.io/api/pkg/db"
"code.vikunja.io/api/pkg/user"
"fmt"
"github.com/typesense/typesense-go/typesense"
"github.com/typesense/typesense-go/typesense/api"
"github.com/typesense/typesense-go/typesense/api/pointer"
)
type TypesenseSync struct {
Collection string `xorm:"not null"`
SyncStartedAt time.Time `xorm:"not null"`
SyncFinishedAt time.Time `xorm:"null"`
}
var typesenseClient *typesense.Client
func InitTypesense() {
@ -190,16 +201,39 @@ func ReindexAllTasks() (err error) {
s := db.NewSession()
defer s.Close()
currentSync := &TypesenseSync{
Collection: "tasks",
SyncStartedAt: time.Now(),
}
_, err = s.Insert(currentSync)
if err != nil {
return err
}
err = s.Find(tasks)
if err != nil {
return err
}
err = reindexTasks(s, tasks)
if err != nil {
return err
}
currentSync.SyncFinishedAt = time.Now()
_, err = s.Where("collection = ?", "tasks").
Cols("sync_finished_at").
Update(currentSync)
return
}
func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) {
err = addMoreInfoToTasks(s, tasks, &user.User{ID: 1})
if err != nil {
return err
}
typesenseTasks := []interface{}{}
for _, task := range tasks {
searchTask := convertTaskToTypesenseTask(task)
@ -209,12 +243,18 @@ func ReindexAllTasks() (err error) {
return err
}
_, err = typesenseClient.Collection("tasks").
Documents().
Create(searchTask)
if err != nil {
return err
}
typesenseTasks = append(typesenseTasks, searchTask)
}
_, err = typesenseClient.Collection("tasks").
Documents().
Import(typesenseTasks, &api.ImportDocumentsParams{
Action: pointer.String("upsert"),
BatchSize: pointer.Int(100),
})
if err != nil {
return err
}
return nil
@ -301,3 +341,84 @@ func convertTaskToTypesenseTask(task *Task) *typesenseTask {
return tt
}
func SyncUpdatedTasksIntoTypesense() (err error) {
tasks := make(map[int64]*Task)
s := db.NewSession()
_ = s.Begin()
defer s.Close()
lastSync := &TypesenseSync{}
has, err := s.Where("collection = ?", "tasks").
Get(lastSync)
if err != nil {
_ = s.Rollback()
return err
}
if !has {
log.Errorf("[Typesense Sync] No typesense sync stats yet, please run a full index via the CLI first")
_ = s.Rollback()
return
}
currentSync := &TypesenseSync{SyncStartedAt: time.Now()}
_, err = s.Where("collection = ?", "tasks").
Cols("sync_started_at", "sync_finished_at").
Update(currentSync)
if err != nil {
_ = s.Rollback()
return
}
err = s.
Where("updated >= ?", lastSync.SyncStartedAt).
Find(tasks)
if err != nil {
_ = s.Rollback()
return
}
if len(tasks) > 0 {
log.Debugf("[Typesense Sync] Updating %d tasks", len(tasks))
err = reindexTasks(s, tasks)
if err != nil {
_ = s.Rollback()
return
}
}
if len(tasks) == 0 {
log.Debugf("[Typesense Sync] No tasks changed since the last sync, not syncing")
}
currentSync.SyncFinishedAt = time.Now()
_, err = s.Where("collection = ?", "tasks").
Cols("sync_finished_at").
Update(currentSync)
if err != nil {
_ = s.Rollback()
return err
}
return s.Commit()
}
func RegisterPeriodicTypesenseResyncCron() {
if !config.TypesenseEnabled.GetBool() {
log.Debugf("[Typesense Sync] Typesense is disabled, not setting up sync cron")
return
}
err := cron.Schedule("* * * * *", func() {
err := SyncUpdatedTasksIntoTypesense()
if err != nil {
log.Fatalf("[Typesense Sync] Could not sync updated tasks into typesense: %s", err)
}
})
if err != nil {
log.Fatalf("[Typesense Sync] Could not register typesense resync cron: %s", err)
}
}