maildir: enable backend threads

Add thread capability to maildir backend. Enables the maildir worker to
thread the entire folder, as opposed to only the displayed messages via
client side threads.

Signed-off-by: Tim Culverhouse <tim@timculverhouse.com>
Tested-by: Inwit <inwit@sindominio.net>
Acked-by: Robin Jarry <robin@jarry.cc>
This commit is contained in:
Tim Culverhouse
2022-11-07 10:15:50 -06:00
committed by Robin Jarry
parent 31d2f5be3c
commit 86c612e480

View File

@@ -18,6 +18,8 @@ import (
"github.com/emersion/go-maildir"
"github.com/fsnotify/fsnotify"
aercLib "git.sr.ht/~rjarry/aerc/lib"
"git.sr.ht/~rjarry/aerc/lib/iterator"
"git.sr.ht/~rjarry/aerc/logging"
"git.sr.ht/~rjarry/aerc/models"
"git.sr.ht/~rjarry/aerc/worker/handlers"
@@ -180,7 +182,7 @@ func (w *Worker) getDirectoryInfo(name string) *models.DirectoryInfo {
Caps: &models.Capabilities{
Sort: true,
Thread: false,
Thread: true,
},
}
@@ -265,6 +267,8 @@ func (w *Worker) handleMessage(msg types.WorkerMessage) error {
return w.handleOpenDirectory(msg)
case *types.FetchDirectoryContents:
return w.handleFetchDirectoryContents(msg)
case *types.FetchDirectoryThreaded:
return w.handleFetchDirectoryThreaded(msg)
case *types.CreateDirectory:
return w.handleCreateDirectory(msg)
case *types.RemoveDirectory:
@@ -459,6 +463,7 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32,
<-limit
}(uid)
}
wg.Wait()
sortedUids, err := lib.Sort(msgInfos, criteria)
if err != nil {
@@ -468,6 +473,85 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32,
return sortedUids, nil
}
func (w *Worker) handleFetchDirectoryThreaded(
msg *types.FetchDirectoryThreaded,
) error {
var (
uids []uint32
err error
)
if len(msg.FilterCriteria) > 1 {
filter, err := parseSearch(msg.FilterCriteria)
if err != nil {
return err
}
uids, err = w.search(filter)
if err != nil {
return err
}
} else {
uids, err = w.c.UIDs(*w.selected)
if err != nil {
logging.Errorf("failed scanning uids: %v", err)
return err
}
}
threads, err := w.threads(uids, msg.SortCriteria)
if err != nil {
logging.Errorf("failed sorting directory: %v", err)
return err
}
w.currentSortCriteria = msg.SortCriteria
w.worker.PostMessage(&types.DirectoryThreaded{
Message: types.RespondTo(msg),
Threads: threads,
}, nil)
return nil
}
func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*types.Thread, error) {
builder := aercLib.NewThreadBuilder(iterator.NewFactory(false))
msgInfos := make([]*models.MessageInfo, 0, len(uids))
mu := sync.Mutex{}
wg := sync.WaitGroup{}
max := runtime.NumCPU() * 2
limit := make(chan struct{}, max)
for _, uid := range uids {
limit <- struct{}{}
wg.Add(1)
go func(uid uint32) {
defer wg.Done()
info, err := w.msgHeadersFromUid(uid)
if err != nil {
logging.Errorf("could not get message info: %v", err)
<-limit
return
}
mu.Lock()
builder.Update(info)
msgInfos = append(msgInfos, info)
mu.Unlock()
<-limit
}(uid)
}
wg.Wait()
var err error
switch {
case len(criteria) == 0:
sort.Slice(uids, func(i int, j int) bool {
return uids[i] < uids[j]
})
default:
uids, err = lib.Sort(msgInfos, criteria)
if err != nil {
logging.Errorf("could not sort the messages: %v", err)
return nil, err
}
}
threads := builder.Threads(uids, false, false)
return threads, nil
}
func (w *Worker) handleCreateDirectory(msg *types.CreateDirectory) error {
dir := w.c.Store.Dir(msg.Directory)
if err := dir.Init(); err != nil {