Fix TestAddThreads().

This commit is contained in:
Gucheng Wang 2021-12-03 10:06:29 +08:00
parent 8aaec717f1
commit fcf1359212
4 changed files with 64 additions and 38 deletions

View File

@ -76,13 +76,13 @@ func uploadDiscuzxFile(username string, fileBytes []byte, fileName string, creat
return fileUrl
}
func uploadAttachmentAndUpdatePost(attachment *Attachment, post *Post) {
func getRecordFromAttachment(attachment *Attachment, post *Post) *object.UploadFileRecord {
oldFileUrl := fmt.Sprintf("%s%s", discuzxAttachmentBaseUrl, attachment.Attachment)
fileBytes, _, err := downloadFile(oldFileUrl)
if err != nil {
if urlError, ok := err.(*url.Error); ok {
fmt.Printf("[%d]: uploadAttachmentAndUpdatePost() error: %s, the attachement is deleted: %s\n", post.Pid, urlError.Error(), attachment.Attachment)
return
fmt.Printf("\t\t[%d]: getRecordFromAttachment() error: %s, the attachement is deleted: %s\n", post.Pid, urlError.Error(), attachment.Attachment)
return nil
} else {
panic(err)
}
@ -95,14 +95,10 @@ func uploadAttachmentAndUpdatePost(attachment *Attachment, post *Post) {
fileType = "image"
}
record := object.UploadFileRecord{
record := &object.UploadFileRecord{
FileName: attachment.Filename,
FileUrl: fileUrl,
FileType: fileType,
}
if post.UploadFileRecords == nil {
post.UploadFileRecords = []*object.UploadFileRecord{}
}
post.UploadFileRecords = append(post.UploadFileRecords, &record)
return record
}

View File

@ -82,6 +82,8 @@ func addThread(thread *Thread, threadPostsMap map[int][]*Post, attachments []*At
//deleteWholeTopic(thread)
mutex := sync.RWMutex{}
var wg sync.WaitGroup
wg.Add(len(attachments))
for _, attachment := range attachments {
@ -90,7 +92,16 @@ func addThread(thread *Thread, threadPostsMap map[int][]*Post, attachments []*At
post := postMap[attachment.Pid]
if post != nil {
uploadAttachmentAndUpdatePost(attachment, post)
record := getRecordFromAttachment(attachment, post)
if record != nil {
mutex.Lock()
if post.UploadFileRecords == nil {
post.UploadFileRecords = []*object.UploadFileRecord{}
}
post.UploadFileRecords = append(post.UploadFileRecords, record)
mutex.Unlock()
}
}
}(attachment)
}

View File

@ -23,7 +23,44 @@ import (
"github.com/casbin/casnode/object"
)
var AddThreadsConcurrency = 20
var AddThreadsConcurrency = 100
var AddThreadsBatchSize = 10000
func addThreads(threads []*Thread, threadPostsMap map[int][]*Post, attachmentMap map[int][]*Attachment, forumMap map[int]*Forum, classMap map[int]*Class) {
arrayMutex := sync.RWMutex{}
var wg sync.WaitGroup
wg.Add(len(threads))
sem := make(chan int, AddThreadsConcurrency)
topics := []*object.Topic{}
replies := []*object.Reply{}
for i, thread := range threads {
sem <- 1
go func(i int, thread *Thread) {
defer wg.Done()
attachments := attachmentMap[thread.Tid]
forum := forumMap[thread.Fid]
topic, replies2 := addThread(thread, threadPostsMap, attachments, forum, classMap)
if topic != nil && replies2 != nil {
arrayMutex.Lock()
topics = append(topics, topic)
replies = append(replies, replies2...)
arrayMutex.Unlock()
fmt.Printf("\t[%d/%d]: Added thread: tid = %d, fid = %d, replies = %d\n", i+1, len(threads), thread.Tid, thread.Fid, len(replies2))
} else {
fmt.Printf("\t[%d/%d]: Added thread: tid = %d, fid = %d, empty thread, removed\n", i+1, len(threads), thread.Tid, thread.Fid)
}
<-sem
}(i, thread)
}
wg.Wait()
object.AddTopicsInBatch(topics)
object.AddRepliesInBatch(replies)
}
func TestAddThreads(t *testing.T) {
object.InitConfig()
@ -42,33 +79,15 @@ func TestAddThreads(t *testing.T) {
threadPostsMap, postCount := getThreadPostsMap()
fmt.Printf("Loaded posts: %d\n", postCount)
arrayMutex := sync.RWMutex{}
for i := 0; i < (len(threads)-1)/AddThreadsBatchSize+1; i++ {
start := i * AddThreadsBatchSize
end := (i + 1) * AddThreadsBatchSize
if end > len(threads) {
end = len(threads)
}
var wg sync.WaitGroup
wg.Add(len(threads))
sem := make(chan int, SyncAvatarsConcurrency)
topics := []*object.Topic{}
replies := []*object.Reply{}
for i, thread := range threads {
sem <- 1
go func(i int, thread *Thread) {
defer wg.Done()
attachments := attachmentMap[thread.Tid]
forum := forumMap[thread.Fid]
topic, replies2 := addThread(thread, threadPostsMap, attachments, forum, classMap)
arrayMutex.Lock()
topics = append(topics, topic)
replies = append(replies, replies2...)
arrayMutex.Unlock()
fmt.Printf("[%d/%d]: Added thread: tid = %d, fid = %d, replies = %d\n", i+1, len(threads), thread.Tid, thread.Fid, len(replies2))
<-sem
}(i, thread)
tmp := threads[start:end]
fmt.Printf("Add threads: [%d - %d].\n", start, end)
addThreads(tmp, threadPostsMap, attachmentMap, forumMap, classMap)
}
wg.Wait()
object.AddTopicsInBatch(topics)
object.AddRepliesInBatch(replies)
}