diff --git a/backend/constants/channels.go b/backend/constants/channels.go new file mode 100644 index 000000000..c38a5ac99 --- /dev/null +++ b/backend/constants/channels.go @@ -0,0 +1,9 @@ +package constants + +const ( + ChannelAllNode = "nodes:public" + + ChannelWorkerNode = "nodes:" + + ChannelMasterNode = "nodes:master" +) diff --git a/backend/constants/message.go b/backend/constants/message.go index f76e8fc3c..72e5fab23 100644 --- a/backend/constants/message.go +++ b/backend/constants/message.go @@ -5,4 +5,5 @@ const ( MsgTypeGetSystemInfo = "get-sys-info" MsgTypeCancelTask = "cancel-task" MsgTypeRemoveLog = "remove-log" + MsgTypeRemoveSpider = "remove-spider" ) diff --git a/backend/constants/task.go b/backend/constants/task.go index 5eeee9678..b6fb615cb 100644 --- a/backend/constants/task.go +++ b/backend/constants/task.go @@ -1,11 +1,18 @@ package constants const ( - StatusPending string = "pending" - StatusRunning string = "running" - StatusFinished string = "finished" - StatusError string = "error" + // 调度中 + StatusPending string = "pending" + // 运行中 + StatusRunning string = "running" + // 已完成 + StatusFinished string = "finished" + // 错误 + StatusError string = "error" + // 取消 StatusCancelled string = "cancelled" + // 节点重启导致的异常终止 + StatusAbnormal string = "abnormal" ) const ( diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 8487df11f..0eb8639b5 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -33,7 +33,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s done <- fmt.Errorf("redis pubsub receive err: %v", msg) return case redis.Message: - fmt.Println(msg) if err := consume(msg); err != nil { fmt.Printf("redis pubsub consume message err: %v", err) continue diff --git a/backend/entity/common.go b/backend/entity/common.go new file mode 100644 index 000000000..332cc4944 --- /dev/null +++ b/backend/entity/common.go @@ -0,0 +1,17 @@ +package entity + +import "strconv" + +type Page struct { + Skip int + Limit int + PageNum int + PageSize int +} + +func (p *Page)GetPage(pageNum string, pageSize string) { + p.PageNum, _ = strconv.Atoi(pageNum) + p.PageSize, _ = strconv.Atoi(pageSize) + p.Skip = p.PageSize * (p.PageNum - 1) + p.Limit = p.PageSize +} \ No newline at end of file diff --git a/backend/entity/node.go b/backend/entity/node.go new file mode 100644 index 000000000..cf52fafbc --- /dev/null +++ b/backend/entity/node.go @@ -0,0 +1,25 @@ +package entity + +type NodeMessage struct { + // 通信类别 + Type string `json:"type"` + + // 任务相关 + TaskId string `json:"task_id"` // 任务ID + + // 节点相关 + NodeId string `json:"node_id"` // 节点ID + + // 日志相关 + LogPath string `json:"log_path"` // 日志路径 + Log string `json:"log"` // 日志 + + // 系统信息 + SysInfo SystemInfo `json:"sys_info"` + + // 爬虫相关 + SpiderId string `json:"spider_id"` //爬虫ID + + // 错误相关 + Error string `json:"error"` +} diff --git a/backend/entity/spider.go b/backend/entity/spider.go new file mode 100644 index 000000000..7f5e02b42 --- /dev/null +++ b/backend/entity/spider.go @@ -0,0 +1,6 @@ +package entity + +type SpiderType struct { + Type string `json:"type" bson:"_id"` + Count int `json:"count" bson:"count"` +} diff --git a/backend/entity/system.go b/backend/entity/system.go new file mode 100644 index 000000000..dff637b7e --- /dev/null +++ b/backend/entity/system.go @@ -0,0 +1,15 @@ +package entity + +type SystemInfo struct { + ARCH string `json:"arch"` + OS string `json:"os"` + Hostname string `json:"host_name"` + NumCpu int `json:"num_cpu"` + Executables []Executable `json:"executables"` +} + +type Executable struct { + Path string `json:"path"` + FileName string `json:"file_name"` + DisplayName string `json:"display_name"` +} diff --git a/backend/main.go b/backend/main.go index bf98674ee..5d95dbaf4 100644 --- a/backend/main.go +++ b/backend/main.go @@ -5,6 +5,7 @@ import ( "crawlab/database" "crawlab/lib/validate_bridge" "crawlab/middlewares" + "crawlab/model" "crawlab/routes" "crawlab/services" "github.com/apex/log" @@ -57,7 +58,7 @@ func main() { } log.Info("初始化Redis数据库成功") - if services.IsMaster() { + if model.IsMaster() { // 初始化定时任务 if err := services.InitScheduler(); err != nil { log.Error("init scheduler error:" + err.Error()) @@ -99,7 +100,7 @@ func main() { log.Info("初始化用户服务成功") // 以下为主节点服务 - if services.IsMaster() { + if model.IsMaster() { // 中间件 app.Use(middlewares.CORSMiddleware()) //app.Use(middlewares.AuthorizationMiddleware()) @@ -131,6 +132,7 @@ func main() { authGroup.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入 authGroup.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录 authGroup.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据 + authGroup.GET("/spider/types", routes.GetSpiderTypes) // 爬虫类型 // 任务 authGroup.GET("/tasks", routes.GetTaskList) // 任务列表 authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情 diff --git a/backend/model/file.go b/backend/model/file.go index 3cea7b398..fe3ece0e5 100644 --- a/backend/model/file.go +++ b/backend/model/file.go @@ -1,11 +1,24 @@ package model import ( + "crawlab/database" "crawlab/utils" "github.com/apex/log" + "github.com/globalsign/mgo/bson" "os" + "runtime/debug" + "time" ) +type GridFs struct { + Id bson.ObjectId `json:"_id" bson:"_id"` + ChunkSize int32 `json:"chunk_size" bson:"chunkSize"` + UploadDate time.Time `json:"upload_date" bson:"uploadDate"` + Length int32 `json:"length" bson:"length"` + Md5 string `json:"md_5" bson:"md5"` + Filename string `json:"filename" bson:"filename"` +} + type File struct { Name string `json:"name"` Path string `json:"path"` @@ -13,12 +26,49 @@ type File struct { Size int64 `json:"size"` } +func (f *GridFs) Remove() { + s, gf := database.GetGridFs("files") + defer s.Close() + if err := gf.RemoveId(f.Id); err != nil { + log.Errorf("remove file id error: %s, id: %s", err.Error(), f.Id.Hex()) + debug.PrintStack() + } +} + +func GetAllGridFs() []*GridFs { + s, gf := database.GetGridFs("files") + defer s.Close() + + var files []*GridFs + if err := gf.Find(nil).All(&files); err != nil { + log.Errorf("get all files error: {}", err.Error()) + debug.PrintStack() + return nil + } + return files +} + +func GetGridFs(id bson.ObjectId) *GridFs { + s, gf := database.GetGridFs("files") + defer s.Close() + + var gfFile GridFs + err := gf.Find(bson.M{"_id": id}).One(&gfFile) + if err != nil { + log.Errorf("get gf file error: %s, file_id: %s", err.Error(), id.Hex()) + debug.PrintStack() + return nil + } + return &gfFile +} + func RemoveFile(path string) error { if !utils.Exists(path) { log.Info("file not found: " + path) + debug.PrintStack() return nil } - if err := os.Remove(path); err != nil { + if err := os.RemoveAll(path); err != nil { return err } return nil diff --git a/backend/model/node.go b/backend/model/node.go index 6211115c3..7af93dbe7 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -7,6 +7,7 @@ import ( "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + "github.com/spf13/viper" "runtime/debug" "time" ) @@ -30,6 +31,73 @@ type Node struct { UpdateTsUnix int64 `json:"update_ts_unix" bson:"update_ts_unix"` } +const ( + Yes = "Y" + No = "N" +) + +// 当前节点是否为主节点 +func IsMaster() bool { + return viper.GetString("server.master") == Yes +} + +// 获取本机节点 +func GetCurrentNode() (Node, error) { + // 获得注册的key值 + key, err := register.GetRegister().GetKey() + if err != nil { + return Node{}, err + } + + // 从数据库中获取当前节点 + var node Node + errNum := 0 + for { + // 如果错误次数超过10次 + if errNum >= 10 { + panic("cannot get current node") + } + + // 尝试获取节点 + node, err = GetNodeByKey(key) + // 如果获取失败 + if err != nil { + // 如果为主节点,表示为第一次注册,插入节点信息 + if IsMaster() { + // 获取本机信息 + ip, mac, key, err := GetNodeBaseInfo() + if err != nil { + debug.PrintStack() + return node, err + } + + // 生成节点 + node = Node{ + Key: key, + Id: bson.NewObjectId(), + Ip: ip, + Name: ip, + Mac: mac, + IsMaster: true, + } + if err := node.Add(); err != nil { + return node, err + } + return node, nil + } + // 增加错误次数 + errNum++ + + // 5秒后重试 + time.Sleep(5 * time.Second) + continue + } + // 跳出循环 + break + } + return node, nil +} + func (n *Node) Save() error { s, c := database.GetCol("nodes") defer s.Close() diff --git a/backend/model/schedule.go b/backend/model/schedule.go index 1c8db0bd6..6415e22b4 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -65,7 +65,8 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { // 获取爬虫名称 spider, err := GetSpider(schedule.SpiderId) if err != nil { - log.Errorf(err.Error()) + log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error()) + debug.PrintStack() continue } schedules[i].SpiderName = spider.Name diff --git a/backend/model/spider.go b/backend/model/spider.go index e0e5f8369..1f88acff2 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -2,6 +2,7 @@ package model import ( "crawlab/database" + "crawlab/entity" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -18,12 +19,12 @@ type Spider struct { Id bson.ObjectId `json:"_id" bson:"_id"` // 爬虫ID Name string `json:"name" bson:"name"` // 爬虫名称(唯一) DisplayName string `json:"display_name" bson:"display_name"` // 爬虫显示名称 - Type string `json:"type"` // 爬虫类别 + Type string `json:"type" bson:"type"` // 爬虫类别 FileId bson.ObjectId `json:"file_id" bson:"file_id"` // GridFS文件ID - Col string `json:"col"` // 结果储存位置 - Site string `json:"site"` // 爬虫网站 + Col string `json:"col" bson:"col"` // 结果储存位置 + Site string `json:"site" bson:"site"` // 爬虫网站 Envs []Env `json:"envs" bson:"envs"` // 环境变量 - Remark string `json:"remark"` // 备注 + Remark string `json:"remark" bson:"remark"` // 备注 // 自定义爬虫 Src string `json:"src" bson:"src"` // 源码位置 Cmd string `json:"cmd" bson:"cmd"` // 执行命令 @@ -47,6 +48,7 @@ type Spider struct { UpdateTs time.Time `json:"update_ts" bson:"update_ts"` } +// 更新爬虫 func (spider *Spider) Save() error { s, c := database.GetCol("spiders") defer s.Close() @@ -60,6 +62,7 @@ func (spider *Spider) Save() error { return nil } +// 新增爬虫 func (spider *Spider) Add() error { s, c := database.GetCol("spiders") defer s.Close() @@ -74,6 +77,7 @@ func (spider *Spider) Add() error { return nil } +// 获取爬虫的任务 func (spider *Spider) GetTasks() ([]Task, error) { tasks, err := GetTaskList(bson.M{"spider_id": spider.Id}, 0, 10, "-create_ts") if err != nil { @@ -82,6 +86,7 @@ func (spider *Spider) GetTasks() ([]Task, error) { return tasks, nil } +// 爬虫最新的任务 func (spider *Spider) GetLastTask() (Task, error) { tasks, err := GetTaskList(bson.M{"spider_id": spider.Id}, 0, 1, "-create_ts") if err != nil { @@ -93,7 +98,8 @@ func (spider *Spider) GetLastTask() (Task, error) { return tasks[0], nil } -func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { +// 爬虫列表 +func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, int, error) { s, c := database.GetCol("spiders") defer s.Close() @@ -101,7 +107,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { spiders := []Spider{} if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil { debug.PrintStack() - return spiders, err + return spiders, 0, err } // 遍历爬虫列表 @@ -119,9 +125,40 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { spiders[i].LastStatus = task.Status } - return spiders, nil + count, _ := c.Find(filter).Count() + + return spiders, count, nil +} + +// 获取爬虫 +func GetSpiderByFileId(fileId bson.ObjectId) *Spider { + s, c := database.GetCol("spiders") + defer s.Close() + + var result *Spider + if err := c.Find(bson.M{"file_id": fileId}).One(&result); err != nil { + log.Errorf("get spider error: %s, file_id: %s", err.Error(), fileId.Hex()) + debug.PrintStack() + return nil + } + return result +} + +// 获取爬虫 +func GetSpiderByName(name string) *Spider { + s, c := database.GetCol("spiders") + defer s.Close() + + var result *Spider + if err := c.Find(bson.M{"name": name}).One(&result); err != nil { + log.Errorf("get spider error: %s, spider_name: %s", err.Error(), name) + debug.PrintStack() + return nil + } + return result } +// 获取爬虫 func GetSpider(id bson.ObjectId) (Spider, error) { s, c := database.GetCol("spiders") defer s.Close() @@ -129,6 +166,7 @@ func GetSpider(id bson.ObjectId) (Spider, error) { var result Spider if err := c.FindId(id).One(&result); err != nil { if err != mgo.ErrNotFound { + log.Errorf("get spider error: %s, id: %id", err.Error(), id.Hex()) debug.PrintStack() } return result, err @@ -136,6 +174,7 @@ func GetSpider(id bson.ObjectId) (Spider, error) { return result, nil } +// 更新爬虫 func UpdateSpider(id bson.ObjectId, item Spider) error { s, c := database.GetCol("spiders") defer s.Close() @@ -152,6 +191,7 @@ func UpdateSpider(id bson.ObjectId, item Spider) error { return nil } +// 删除爬虫 func RemoveSpider(id bson.ObjectId) error { s, c := database.GetCol("spiders") defer s.Close() @@ -162,6 +202,8 @@ func RemoveSpider(id bson.ObjectId) error { } if err := c.RemoveId(id); err != nil { + log.Errorf("remove spider error: %s, id:%s", err.Error(), id.Hex()) + debug.PrintStack() return err } @@ -171,12 +213,14 @@ func RemoveSpider(id bson.ObjectId) error { if err := gf.RemoveId(result.FileId); err != nil { log.Error("remove file error, id:" + result.FileId.Hex()) + debug.PrintStack() return err } return nil } +// 删除所有爬虫 func RemoveAllSpider() error { s, c := database.GetCol("spiders") defer s.Close() @@ -195,6 +239,7 @@ func RemoveAllSpider() error { return nil } +// 爬虫总数 func GetSpiderCount() (int, error) { s, c := database.GetCol("spiders") defer s.Close() @@ -203,6 +248,27 @@ func GetSpiderCount() (int, error) { if err != nil { return 0, err } - return count, nil } + +// 爬虫类型 +func GetSpiderTypes() ([]*entity.SpiderType, error) { + s, c := database.GetCol("spiders") + defer s.Close() + + + group := bson.M{ + "$group": bson.M{ + "_id": "$type", + "count": bson.M{"$sum": 1}, + }, + } + var types []*entity.SpiderType + if err := c.Pipe([]bson.M{ group}).All(&types); err != nil { + log.Errorf("get spider types error: %s", err.Error()) + debug.PrintStack() + return nil, err + } + + return types, nil +} diff --git a/backend/model/system.go b/backend/model/system.go index 6091c963e..5c2f5997e 100644 --- a/backend/model/system.go +++ b/backend/model/system.go @@ -1,6 +1,7 @@ package model import ( + "crawlab/entity" "github.com/apex/log" "io/ioutil" "os" @@ -35,21 +36,7 @@ var executableNameMap = map[string]string{ "bash": "bash", } -type SystemInfo struct { - ARCH string `json:"arch"` - OS string `json:"os"` - Hostname string `json:"host_name"` - NumCpu int `json:"num_cpu"` - Executables []Executable `json:"executables"` -} - -type Executable struct { - Path string `json:"path"` - FileName string `json:"file_name"` - DisplayName string `json:"display_name"` -} - -func GetLocalSystemInfo() (sysInfo SystemInfo, err error) { +func GetLocalSystemInfo() (sysInfo entity.SystemInfo, err error) { executables, err := GetExecutables() if err != nil { return sysInfo, err @@ -60,7 +47,7 @@ func GetLocalSystemInfo() (sysInfo SystemInfo, err error) { return sysInfo, err } - return SystemInfo{ + return entity.SystemInfo{ ARCH: runtime.GOARCH, OS: runtime.GOOS, NumCpu: runtime.GOMAXPROCS(0), @@ -78,7 +65,7 @@ func GetPathValues() (paths []string) { return strings.Split(pathEnv, ":") } -func GetExecutables() (executables []Executable, err error) { +func GetExecutables() (executables []entity.Executable, err error) { pathValues := GetPathValues() cache := map[string]string{} @@ -97,7 +84,7 @@ func GetExecutables() (executables []Executable, err error) { if cache[filePath] == "" { if displayName != "" { - executables = append(executables, Executable{ + executables = append(executables, entity.Executable{ Path: filePath, FileName: file.Name(), DisplayName: displayName, diff --git a/backend/model/task.go b/backend/model/task.go index 177edccbb..f568b7fe1 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -25,6 +25,7 @@ type Task struct { WaitDuration float64 `json:"wait_duration" bson:"wait_duration"` RuntimeDuration float64 `json:"runtime_duration" bson:"runtime_duration"` TotalDuration float64 `json:"total_duration" bson:"total_duration"` + Pid int `json:"pid" bson:"pid"` // 前端数据 SpiderName string `json:"spider_name"` @@ -191,6 +192,7 @@ func RemoveTask(id string) error { return nil } +// 删除task by spider_id func RemoveTaskBySpiderId(id bson.ObjectId) error { tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") if err != nil { @@ -206,6 +208,7 @@ func RemoveTaskBySpiderId(id bson.ObjectId) error { return nil } +// task 总数 func GetTaskCount(query interface{}) (int, error) { s, c := database.GetCol("tasks") defer s.Close() @@ -308,6 +311,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) { return dailyItems, nil } +// 更新task的结果数 func UpdateTaskResultCount(id string) (err error) { // 获取任务 task, err := GetTask(id) @@ -343,3 +347,25 @@ func UpdateTaskResultCount(id string) (err error) { } return nil } + +func UpdateTaskToAbnormal(nodeId bson.ObjectId) error { + s, c := database.GetCol("tasks") + defer s.Close() + + selector := bson.M{ + "node_id": nodeId, + "status": constants.StatusRunning, + } + update := bson.M{ + "$set": bson.M{ + "status": constants.StatusAbnormal, + }, + } + _, err := c.UpdateAll(selector, update) + if err != nil { + log.Errorf("update task to abnormal error: %s, node_id : %s", err.Error(), nodeId.Hex()) + debug.PrintStack() + return err + } + return nil +} diff --git a/backend/routes/spider.go b/backend/routes/spider.go index e0afb1a84..addddd999 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -3,6 +3,7 @@ package routes import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/model" "crawlab/services" "crawlab/utils" @@ -11,7 +12,7 @@ import ( "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/pkg/errors" - uuid "github.com/satori/go.uuid" + "github.com/satori/go.uuid" "github.com/spf13/viper" "io/ioutil" "net/http" @@ -24,7 +25,22 @@ import ( ) func GetSpiderList(c *gin.Context) { - results, err := model.GetSpiderList(nil, 0, 0) + pageNum, _ := c.GetQuery("pageNum") + pageSize, _ := c.GetQuery("pageSize") + keyword, _ := c.GetQuery("keyword") + t, _ := c.GetQuery("type") + + filter := bson.M{ + "name": bson.M{"$regex": bson.RegEx{Pattern: keyword, Options: "im"}}, + } + + if t != "" { + filter["type"] = t + } + + page := &entity.Page{} + page.GetPage(pageNum, pageSize) + results, count, err := model.GetSpiderList(filter, page.Skip, page.Limit) if err != nil { HandleError(http.StatusInternalServerError, c, err) return @@ -32,7 +48,7 @@ func GetSpiderList(c *gin.Context) { c.JSON(http.StatusOK, Response{ Status: "ok", Message: "success", - Data: results, + Data: bson.M{"list": results, "total": count}, }) } @@ -79,18 +95,6 @@ func PostSpider(c *gin.Context) { }) } -func PublishAllSpiders(c *gin.Context) { - if err := services.PublishAllSpiders(); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) -} - func PublishSpider(c *gin.Context) { id := c.Param("id") @@ -104,10 +108,7 @@ func PublishSpider(c *gin.Context) { return } - if err := services.PublishSpider(spider); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } + services.PublishSpider(spider) c.JSON(http.StatusOK, Response{ Status: "ok", @@ -117,7 +118,7 @@ func PublishSpider(c *gin.Context) { func PutSpider(c *gin.Context) { // 从body中获取文件 - file, err := c.FormFile("file") + uploadFile, err := c.FormFile("file") if err != nil { debug.PrintStack() HandleError(http.StatusInternalServerError, c, err) @@ -125,7 +126,7 @@ func PutSpider(c *gin.Context) { } // 如果不为zip文件,返回错误 - if !strings.HasSuffix(file.Filename, ".zip") { + if !strings.HasSuffix(uploadFile.Filename, ".zip") { debug.PrintStack() HandleError(http.StatusBadRequest, c, errors.New("Not a valid zip file")) return @@ -145,57 +146,54 @@ func PutSpider(c *gin.Context) { // 保存到本地临时文件 randomId := uuid.NewV4() tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") - if err := c.SaveUploadedFile(file, tmpFilePath); err != nil { + if err := c.SaveUploadedFile(uploadFile, tmpFilePath); err != nil { log.Error("save upload file error: " + err.Error()) debug.PrintStack() HandleError(http.StatusInternalServerError, c, err) return } - // 读取临时文件 - tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777) - if err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - return - } - if err = tmpFile.Close(); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 目标目录 - dstPath := filepath.Join( - viper.GetString("spider.path"), - strings.Replace(file.Filename, ".zip", "", 1), - ) + s, gf := database.GetGridFs("files") + defer s.Close() - // 如果目标目录已存在,删除目标目录 - if utils.Exists(dstPath) { - if err := os.RemoveAll(dstPath); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - } + // 判断文件是否已经存在 + var gfFile model.GridFs + if err := gf.Find(bson.M{"filename": uploadFile.Filename}).One(&gfFile); err == nil { + // 已经存在文件,则删除 + _ = gf.RemoveId(gfFile.Id) } - // 将临时文件解压到爬虫目录 - if err := utils.DeCompress(tmpFile, dstPath); err != nil { + // 上传到GridFs + fid, err := services.UploadToGridFs(uploadFile.Filename, tmpFilePath) + if err != nil { + log.Errorf("upload to grid fs error: %s", err.Error()) debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) return } - // 删除临时文件 - if err = os.Remove(tmpFilePath); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - return + idx := strings.LastIndex(uploadFile.Filename, "/") + targetFilename := uploadFile.Filename[idx+1:] + + // 判断爬虫是否存在 + spiderName := strings.Replace(targetFilename, ".zip", "", 1) + spider := model.GetSpiderByName(spiderName) + if spider == nil { + // 保存爬虫信息 + srcPath := viper.GetString("spider.path") + spider := model.Spider{ + Name: spiderName, + DisplayName: spiderName, + Type: constants.Customized, + Src: filepath.Join(srcPath, spiderName), + FileId: fid, + } + _ = spider.Add() + } else { + // 更新file_id + spider.FileId = fid + _ = spider.Save() } - // 更新爬虫 - services.UpdateSpiders() - c.JSON(http.StatusOK, Response{ Status: "ok", Message: "success", @@ -210,33 +208,7 @@ func DeleteSpider(c *gin.Context) { return } - // 获取该爬虫 - spider, err := model.GetSpider(bson.ObjectIdHex(id)) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 删除爬虫文件目录 - if err := os.RemoveAll(spider.Src); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 从数据库中删除该爬虫 - if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 删除日志文件 - if err := services.RemoveLogBySpiderId(spider.Id); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 删除爬虫对应的task任务 - if err := model.RemoveTaskBySpiderId(spider.Id); err != nil { + if err := services.RemoveSpider(id); err != nil { HandleError(http.StatusInternalServerError, c, err) return } @@ -284,7 +256,8 @@ func GetSpiderDir(c *gin.Context) { } // 获取目录下文件列表 - f, err := ioutil.ReadDir(filepath.Join(spider.Src, path)) + spiderPath := viper.GetString("spider.path") + f, err := ioutil.ReadDir(filepath.Join(spiderPath, spider.Name, path)) if err != nil { HandleError(http.StatusInternalServerError, c, err) return @@ -373,6 +346,20 @@ func PostSpiderFile(c *gin.Context) { }) } +// 爬虫类型 +func GetSpiderTypes(c *gin.Context) { + types, err := model.GetSpiderTypes() + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: types, + }) +} + func GetSpiderStats(c *gin.Context) { type Overview struct { TaskCount int `json:"task_count" bson:"task_count"` diff --git a/backend/services/log.go b/backend/services/log.go index 95459f8f3..485cb7dd8 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -3,9 +3,9 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "github.com/apex/log" @@ -23,7 +23,7 @@ var TaskLogChanMap = utils.NewChanMap() // 获取远端日志 func GetRemoteLog(task model.Task) (logStr string, err error) { // 序列化消息 - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeGetLog, LogPath: task.LogPath, TaskId: task.Id, @@ -85,21 +85,16 @@ func RemoveLocalLog(path string) error { // 删除远程日志 func RemoveRemoteLog(task model.Task) error { - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeRemoveLog, LogPath: task.LogPath, TaskId: task.Id, } - msgBytes, err := json.Marshal(&msg) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } // 发布获取日志消息 channel := "nodes:" + task.NodeId.Hex() - if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil { - log.Errorf(err.Error()) + if _, err := database.RedisClient.Publish(channel, utils.GetJson(msg)); err != nil { + log.Errorf("publish redis error: %s", err.Error()) + debug.PrintStack() return err } return nil @@ -119,10 +114,12 @@ func RemoveLogByTaskId(id string) error { func removeLog(t model.Task) { if err := RemoveLocalLog(t.LogPath); err != nil { - log.Error("remove local log error:" + err.Error()) + log.Errorf("remove local log error: %s", err.Error()) + debug.PrintStack() } if err := RemoveRemoteLog(t); err != nil { - log.Error("remove remote log error:" + err.Error()) + log.Errorf("remove remote log error: %s", err.Error()) + debug.PrintStack() } } @@ -130,7 +127,8 @@ func removeLog(t model.Task) { func RemoveLogBySpiderId(id bson.ObjectId) error { tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") if err != nil { - log.Error("get tasks error:" + err.Error()) + log.Errorf("get tasks error: %s", err.Error()) + debug.PrintStack() } for _, task := range tasks { removeLog(task) diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go index 61516bcfd..848e0c5d7 100644 --- a/backend/services/msg_handler/handler.go +++ b/backend/services/msg_handler/handler.go @@ -2,47 +2,34 @@ package msg_handler import ( "crawlab/constants" - "crawlab/model" + "crawlab/entity" ) type Handler interface { Handle() error } -func GetMsgHandler(msg NodeMessage) Handler { +func GetMsgHandler(msg entity.NodeMessage) Handler { if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { + // 日志相关 return &Log{ msg: msg, } } else if msg.Type == constants.MsgTypeCancelTask { + // 任务相关 return &Task{ msg: msg, } } else if msg.Type == constants.MsgTypeGetSystemInfo { + // 系统信息相关 return &SystemInfo{ msg: msg, } + } else if msg.Type == constants.MsgTypeRemoveSpider { + // 爬虫相关 + return &Spider{ + SpiderId: msg.SpiderId, + } } return nil } - -type NodeMessage struct { - // 通信类别 - Type string `json:"type"` - - // 任务相关 - TaskId string `json:"task_id"` // 任务ID - - // 节点相关 - NodeId string `json:"node_id"` // 节点ID - - // 日志相关 - LogPath string `json:"log_path"` // 日志路径 - Log string `json:"log"` // 日志 - - // 系统信息 - SysInfo model.SystemInfo `json:"sys_info"` - - // 错误相关 - Error string `json:"error"` -} diff --git a/backend/services/msg_handler/msg_log.go b/backend/services/msg_handler/msg_log.go index 0d09d784e..37080bd67 100644 --- a/backend/services/msg_handler/msg_log.go +++ b/backend/services/msg_handler/msg_log.go @@ -2,16 +2,15 @@ package msg_handler import ( "crawlab/constants" - "crawlab/database" + "crawlab/entity" "crawlab/model" "crawlab/utils" - "encoding/json" "github.com/apex/log" "runtime/debug" ) type Log struct { - msg NodeMessage + msg entity.NodeMessage } func (g *Log) Handle() error { @@ -25,31 +24,22 @@ func (g *Log) Handle() error { func (g *Log) get() error { // 发出的消息 - msgSd := NodeMessage{ + msgSd := entity.NodeMessage{ Type: constants.MsgTypeGetLog, TaskId: g.msg.TaskId, } // 获取本地日志 logStr, err := model.GetLocalLog(g.msg.LogPath) - log.Info(utils.BytesToString(logStr)) if err != nil { - log.Errorf(err.Error()) + log.Errorf("get node local log error: %s", err.Error()) debug.PrintStack() msgSd.Error = err.Error() msgSd.Log = err.Error() } else { msgSd.Log = utils.BytesToString(logStr) } - - // 序列化 - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - return err - } - // 发布消息给主节点 - log.Info("publish get log msg to master") - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { + if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { return err } return nil diff --git a/backend/services/msg_handler/msg_spider.go b/backend/services/msg_handler/msg_spider.go new file mode 100644 index 000000000..dcd6ce062 --- /dev/null +++ b/backend/services/msg_handler/msg_spider.go @@ -0,0 +1,24 @@ +package msg_handler + +import ( + "crawlab/model" + "crawlab/utils" + "github.com/globalsign/mgo/bson" + "github.com/spf13/viper" + "path/filepath" +) + +type Spider struct { + SpiderId string +} + +func (s *Spider) Handle() error { + // 移除本地的爬虫目录 + spider, err := model.GetSpider(bson.ObjectIdHex(s.SpiderId)) + if err != nil { + return err + } + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + utils.RemoveFiles(path) + return nil +} diff --git a/backend/services/msg_handler/msg_system_info.go b/backend/services/msg_handler/msg_system_info.go index c81cb0a03..6b88e2cf0 100644 --- a/backend/services/msg_handler/msg_system_info.go +++ b/backend/services/msg_handler/msg_system_info.go @@ -2,16 +2,13 @@ package msg_handler import ( "crawlab/constants" - "crawlab/database" + "crawlab/entity" "crawlab/model" "crawlab/utils" - "encoding/json" - "github.com/apex/log" - "runtime/debug" ) type SystemInfo struct { - msg NodeMessage + msg entity.NodeMessage } func (s *SystemInfo) Handle() error { @@ -20,19 +17,12 @@ func (s *SystemInfo) Handle() error { if err != nil { return err } - msgSd := NodeMessage{ + msgSd := entity.NodeMessage{ Type: constants.MsgTypeGetSystemInfo, NodeId: s.msg.NodeId, SysInfo: sysInfo, } - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { - log.Errorf(err.Error()) + if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { return err } return nil diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go index 1d218264b..21b954302 100644 --- a/backend/services/msg_handler/msg_task.go +++ b/backend/services/msg_handler/msg_task.go @@ -2,16 +2,39 @@ package msg_handler import ( "crawlab/constants" + "crawlab/entity" + "crawlab/model" "crawlab/utils" + "github.com/apex/log" + "runtime/debug" + "time" ) type Task struct { - msg NodeMessage + msg entity.NodeMessage } func (t *Task) Handle() error { + log.Infof("received cancel task msg, task_id: %s", t.msg.TaskId) // 取消任务 ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId) - ch <- constants.TaskCancel + if ch != nil { + ch <- constants.TaskCancel + } else { + log.Infof("chan is empty, update status to abnormal") + // 节点可能被重启,找不到chan + task, err := model.GetTask(t.msg.TaskId) + if err != nil { + log.Errorf("task not found, task_id: %s", t.msg.TaskId) + debug.PrintStack() + return err + } + task.Status = constants.StatusAbnormal + task.FinishTs = time.Now() + if err := task.Save(); err != nil { + debug.PrintStack() + log.Infof("cancel task error: %s", err.Error()) + } + } return nil } diff --git a/backend/services/node.go b/backend/services/node.go index 44fa39056..53af8d32a 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -1,9 +1,9 @@ package services import ( - "context" "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" "crawlab/services/msg_handler" @@ -14,7 +14,6 @@ import ( "github.com/apex/log" "github.com/globalsign/mgo/bson" "github.com/gomodule/redigo/redis" - "github.com/spf13/viper" "runtime/debug" "time" ) @@ -28,77 +27,10 @@ type Data struct { UpdateTsUnix int64 `json:"update_ts_unix"` } -const ( - Yes = "Y" - No = "N" -) - -// 获取本机节点 -func GetCurrentNode() (model.Node, error) { - // 获得注册的key值 - key, err := register.GetRegister().GetKey() - if err != nil { - return model.Node{}, err - } - - // 从数据库中获取当前节点 - var node model.Node - errNum := 0 - for { - // 如果错误次数超过10次 - if errNum >= 10 { - panic("cannot get current node") - } - - // 尝试获取节点 - node, err = model.GetNodeByKey(key) - // 如果获取失败 - if err != nil { - // 如果为主节点,表示为第一次注册,插入节点信息 - if IsMaster() { - // 获取本机信息 - ip, mac, key, err := model.GetNodeBaseInfo() - if err != nil { - debug.PrintStack() - return node, err - } - - // 生成节点 - node = model.Node{ - Key: key, - Id: bson.NewObjectId(), - Ip: ip, - Name: ip, - Mac: mac, - IsMaster: true, - } - if err := node.Add(); err != nil { - return node, err - } - return node, nil - } - // 增加错误次数 - errNum++ - - // 5秒后重试 - time.Sleep(5 * time.Second) - continue - } - // 跳出循环 - break - } - return node, nil -} - -// 当前节点是否为主节点 -func IsMaster() bool { - return viper.GetString("server.master") == Yes -} - // 所有调用IsMasterNode的方法,都永远会在master节点执行,所以GetCurrentNode方法返回永远是master节点 // 该ID的节点是否为主节点 func IsMasterNode(id string) bool { - curNode, _ := GetCurrentNode() + curNode, _ := model.GetCurrentNode() node, _ := model.GetNode(bson.ObjectIdHex(id)) return curNode.Id == node.Id } @@ -223,7 +155,7 @@ func UpdateNodeData() { Key: key, Mac: mac, Ip: ip, - Master: IsMaster(), + Master: model.IsMaster(), UpdateTs: time.Now(), UpdateTsUnix: time.Now().Unix(), } @@ -243,7 +175,7 @@ func UpdateNodeData() { func MasterNodeCallback(message redis.Message) (err error) { // 反序列化 - var msg msg_handler.NodeMessage + var msg entity.NodeMessage if err := json.Unmarshal(message.Data, &msg); err != nil { return err @@ -251,7 +183,6 @@ func MasterNodeCallback(message redis.Message) (err error) { if msg.Type == constants.MsgTypeGetLog { // 获取日志 - fmt.Println(msg) time.Sleep(10 * time.Millisecond) ch := TaskLogChanMap.ChanBlocked(msg.TaskId) ch <- msg.Log @@ -268,14 +199,8 @@ func MasterNodeCallback(message redis.Message) (err error) { func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 - msg := msg_handler.NodeMessage{} - if err := json.Unmarshal(message.Data, &msg); err != nil { - - return err - } - - // worker message handle - if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil { + msg := utils.GetMessage(message) + if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil { return err } return nil @@ -297,30 +222,32 @@ func InitNodeService() error { UpdateNodeData() // 获取当前节点 - node, err := GetCurrentNode() + node, err := model.GetCurrentNode() if err != nil { log.Errorf(err.Error()) return err } - ctx := context.Background() - if IsMaster() { + + if model.IsMaster() { // 如果为主节点,订阅主节点通信频道 - channel := "nodes:master" - err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel) - if err != nil { + if err := utils.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { return err } } else { // 若为工作节点,订阅单独指定通信频道 - channel := "nodes:" + node.Id.Hex() - err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel) - if err != nil { + channel := constants.ChannelWorkerNode + node.Id.Hex() + if err := utils.Sub(channel, WorkerNodeCallback); err != nil { return err } } + // 订阅全通道 + if err := utils.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil { + return err + } + // 如果为主节点,每30秒刷新所有节点信息 - if IsMaster() { + if model.IsMaster() { spec := "*/10 * * * * *" if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil { debug.PrintStack() @@ -328,6 +255,12 @@ func InitNodeService() error { } } + // 更新在当前节点执行中的任务状态为:abnormal + if err := model.UpdateTaskToAbnormal(node.Id); err != nil { + debug.PrintStack() + return err + } + c.Start() return nil } diff --git a/backend/services/schedule.go b/backend/services/schedule.go index 1c08e0fde..58cdf6287 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -62,12 +62,16 @@ func (s *Scheduler) Start() error { // 更新任务列表 if err := s.Update(); err != nil { + log.Errorf("update scheduler error: %s", err.Error()) + debug.PrintStack() return err } // 每30秒更新一次任务列表 spec := "*/30 * * * * *" if _, err := exec.AddFunc(spec, UpdateSchedules); err != nil { + log.Errorf("add func update schedulers error: %s", err.Error()) + debug.PrintStack() return err } @@ -80,12 +84,16 @@ func (s *Scheduler) AddJob(job model.Schedule) error { // 添加任务 eid, err := s.cron.AddFunc(spec, AddTask(job)) if err != nil { + log.Errorf("add func task error: %s", err.Error()) + debug.PrintStack() return err } // 更新EntryID job.EntryId = eid if err := job.Save(); err != nil { + log.Errorf("job save error: %s", err.Error()) + debug.PrintStack() return err } @@ -106,6 +114,8 @@ func (s *Scheduler) Update() error { // 获取所有定时任务 sList, err := model.GetScheduleList(nil) if err != nil { + log.Errorf("get scheduler list error: %s", err.Error()) + debug.PrintStack() return err } @@ -116,6 +126,8 @@ func (s *Scheduler) Update() error { // 添加到定时任务 if err := s.AddJob(job); err != nil { + log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), job.Name, job.Cron) + debug.PrintStack() return err } } @@ -128,6 +140,8 @@ func InitScheduler() error { cron: cron.New(cron.WithSeconds()), } if err := Sched.Start(); err != nil { + log.Errorf("start scheduler error: %s", err.Error()) + debug.PrintStack() return err } return nil diff --git a/backend/services/spider.go b/backend/services/spider.go index fdf095179..a2e9a60f1 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -1,28 +1,21 @@ package services import ( - "context" "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/spider_handler" "crawlab/utils" - "encoding/json" "fmt" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" - "github.com/gomodule/redigo/redis" - "github.com/pkg/errors" - "github.com/satori/go.uuid" "github.com/spf13/viper" - "io" - "io/ioutil" "os" "path/filepath" "runtime/debug" - "strings" - "syscall" ) type SpiderFileData struct { @@ -36,175 +29,14 @@ type SpiderUploadMessage struct { SpiderId string } -// 从项目目录中获取爬虫列表 -func GetSpidersFromDir() ([]model.Spider, error) { - // 爬虫项目目录路径 - srcPath := viper.GetString("spider.path") - - // 如果爬虫项目目录不存在,则创建一个 - if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 - defer syscall.Umask(mask) // 改为原来的 umask - if err := os.MkdirAll(srcPath, 0766); err != nil { - debug.PrintStack() - return []model.Spider{}, err - } - } - - // 获取爬虫项目目录下的所有子项 - items, err := ioutil.ReadDir(srcPath) - if err != nil { - debug.PrintStack() - return []model.Spider{}, err - } - - // 定义爬虫列表 - spiders := make([]model.Spider, 0) - - // 遍历所有子项 - for _, item := range items { - // 忽略不为目录的子项 - if !item.IsDir() { - continue - } - - // 忽略隐藏目录 - if strings.HasPrefix(item.Name(), ".") { - continue - } - - // 构造爬虫 - spider := model.Spider{ - Name: item.Name(), - DisplayName: item.Name(), - Type: constants.Customized, - Src: filepath.Join(srcPath, item.Name()), - FileId: bson.ObjectIdHex(constants.ObjectIdNull), - } - - // 将爬虫加入列表 - spiders = append(spiders, spider) - } - - return spiders, nil -} - -// 将爬虫保存到数据库 -func SaveSpiders(spiders []model.Spider) error { - s, c := database.GetCol("spiders") - defer s.Close() - - if len(spiders) == 0 { - err := model.RemoveAllSpider() - if err != nil { - log.Error("remove all spider error:" + err.Error()) - return err - } - log.Info("get spider from dir is empty,removed all spider") - return nil - } - // 如果该爬虫不存在于数据库,则保存爬虫到数据库 - for _, spider := range spiders { - // 忽略非自定义爬虫 - if spider.Type != constants.Customized { - continue - } - spider_ := []*model.Spider{} - _ = c.Find(bson.M{"src": spider.Src}).All(&spider_) - // 以防出现多个重复的爬虫 - if len(spider_) > 1 { - if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil { - log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) - debug.PrintStack() - continue - } - if err := spider.Add(); err != nil { - log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) - debug.PrintStack() - continue - } - continue - } - if len(spider_) == 0 { - // 不存在 - if err := spider.Add(); err != nil { - log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) - debug.PrintStack() - continue - } - } - } - return nil -} - -// 更新爬虫 -func UpdateSpiders() { - // 从项目目录获取爬虫列表 - spiders, err := GetSpidersFromDir() - if err != nil { - log.Errorf(err.Error()) - return - } - - // 储存爬虫 - if err := SaveSpiders(spiders); err != nil { - log.Errorf(err.Error()) - return - } -} - -// 打包爬虫目录为zip文件 -func ZipSpider(spider model.Spider) (filePath string, err error) { - // 如果源文件夹不存在,抛错 - if !utils.Exists(spider.Src) { - debug.PrintStack() - // 删除该爬虫,否则会一直报错 - _ = model.RemoveSpider(spider.Id) - return "", errors.New("source path does not exist") - } - - // 临时文件路径 - randomId := uuid.NewV4() - - tmpPath := viper.GetString("other.tmppath") - if !utils.Exists(tmpPath) { - if err := os.MkdirAll(tmpPath, 0777); err != nil { - log.Errorf("mkdir other.tmppath error: %v", err.Error()) - return "", err - } - } - filePath = filepath.Join(tmpPath, randomId.String()+".zip") - // 将源文件夹打包为zip文件 - d, err := os.Open(spider.Src) - if err != nil { - debug.PrintStack() - return filePath, err - } - var files []*os.File - files = append(files, d) - if err := utils.Compress(files, filePath); err != nil { - return filePath, err - } - - return filePath, nil -} - // 上传zip文件到GridFS -func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid bson.ObjectId, err error) { +func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) { fid = "" // 获取MongoDB GridFS连接 s, gf := database.GetGridFs("files") defer s.Close() - // 如果存在FileId删除GridFS上的老文件 - if !utils.IsObjectIdNull(spider.FileId) { - if err = gf.RemoveId(spider.FileId); err != nil { - log.Error("remove gf file:" + err.Error()) - debug.PrintStack() - } - } - // 创建一个新GridFS文件 f, err := gf.Create(fileName) if err != nil { @@ -234,6 +66,7 @@ func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid return fid, nil } +// 写入grid fs func WriteToGridFS(content []byte, f *mgo.GridFile) { if _, err := f.Write(content); err != nil { debug.PrintStack() @@ -264,149 +97,97 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre } // 发布所有爬虫 -func PublishAllSpiders() error { +func PublishAllSpiders() { // 获取爬虫列表 - spiders, err := model.GetSpiderList(nil, 0, constants.Infinite) - if err != nil { - log.Errorf(err.Error()) - return err + spiders, _, _ := model.GetSpiderList(nil, 0, constants.Infinite) + if len(spiders) == 0 { + return } - + log.Infof("start sync spider to local, total: %d", len(spiders)) // 遍历爬虫列表 for _, spider := range spiders { - // 发布爬虫 - if err := PublishSpider(spider); err != nil { - log.Errorf("publish spider error:" + err.Error()) - // return err - } - } - - return nil -} - -func PublishAllSpidersJob() { - if err := PublishAllSpiders(); err != nil { - log.Errorf(err.Error()) + // 异步发布爬虫 + go func(s model.Spider) { + PublishSpider(s) + }(spider) } } // 发布爬虫 -// 1. 将源文件夹打包为zip文件 -// 2. 上传zip文件到GridFS -// 3. 发布消息给工作节点 -func PublishSpider(spider model.Spider) (err error) { - // 将源文件夹打包为zip文件 - filePath, err := ZipSpider(spider) - if err != nil { - return err - } - - // 上传zip文件到GridFS - fileName := filepath.Base(spider.Src) + ".zip" - fid, err := UploadToGridFs(spider, fileName, filePath) - if err != nil { - return err +func PublishSpider(spider model.Spider) { + // 查询gf file,不存在则删除 + gfFile := model.GetGridFs(spider.FileId) + if gfFile == nil { + _ = model.RemoveSpider(spider.Id) + return } - - // 保存FileId - spider.FileId = fid - if err := spider.Save(); err != nil { - return err + spiderSync := spider_handler.SpiderSync{ + Spider: spider, } - // 发布消息给工作节点 - msg := SpiderUploadMessage{ - FileId: fid.Hex(), - FileName: fileName, - SpiderId: spider.Id.Hex(), + //目录不存在,则直接下载 + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + if !utils.Exists(path) { + log.Infof("path not found: %s", path) + spiderSync.Download() + spiderSync.CreateMd5File(gfFile.Md5) + return } - msgStr, err := json.Marshal(msg) - if err != nil { + // md5文件不存在,则下载 + md5 := filepath.Join(path, spider_handler.Md5File) + if !utils.Exists(md5) { + log.Infof("md5 file not found: %s", md5) + spiderSync.RemoveSpiderFile() + spiderSync.Download() + spiderSync.CreateMd5File(gfFile.Md5) return } - channel := "files:upload" - if _, err = database.RedisClient.Publish(channel, utils.BytesToString(msgStr)); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + // md5值不一样,则下载 + md5Str := utils.ReadFileOneLine(md5) + if gfFile.Md5 != md5Str { + spiderSync.RemoveSpiderFile() + spiderSync.Download() + spiderSync.CreateMd5File(gfFile.Md5) return } - - return } -// 上传爬虫回调 -func OnFileUpload(message redis.Message) (err error) { - s, gf := database.GetGridFs("files") - defer s.Close() - - // 反序列化消息 - var msg SpiderUploadMessage - if err := json.Unmarshal(message.Data, &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } - - // 从GridFS获取该文件 - f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) +func RemoveSpider(id string) error { + // 获取该爬虫 + spider, err := model.GetSpider(bson.ObjectIdHex(id)) if err != nil { - log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) - debug.PrintStack() return err } - defer f.Close() - // 生成唯一ID - randomId := uuid.NewV4() - tmpPath := viper.GetString("other.tmppath") - if !utils.Exists(tmpPath) { - if err := os.MkdirAll(tmpPath, 0777); err != nil { - log.Errorf("mkdir other.tmppath error: %v", err.Error()) - return err - } - } - // 创建临时文件 - tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") + // 删除爬虫文件目录 + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + utils.RemoveFiles(path) - tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err + // 删除其他节点的爬虫目录 + msg := entity.NodeMessage{ + Type: constants.MsgTypeRemoveSpider, + SpiderId: id, } - defer tmpFile.Close() - - // 将该文件写入临时文件 - if _, err := io.Copy(tmpFile, f); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + if err := utils.Pub(constants.ChannelAllNode, msg); err != nil { return err } - // 解压缩临时文件到目标文件夹 - dstPath := filepath.Join( - viper.GetString("spider.path"), - // strings.Replace(msg.FileName, ".zip", "", -1), - ) - if err := utils.DeCompress(tmpFile, dstPath); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + // 从数据库中删除该爬虫 + if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil { return err } - // 关闭临时文件 - if err := tmpFile.Close(); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + // 删除日志文件 + if err := RemoveLogBySpiderId(spider.Id); err != nil { return err } - // 删除临时文件 - if err := os.Remove(tmpFilePath); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + // 删除爬虫对应的task任务 + if err := model.RemoveTaskBySpiderId(spider.Id); err != nil { return err } + + // TODO 定时任务如何处理 return nil } @@ -414,31 +195,9 @@ func OnFileUpload(message redis.Message) (err error) { func InitSpiderService() error { // 构造定时任务执行器 c := cron.New(cron.WithSeconds()) - - if IsMaster() { - // 主节点 - - // 每5秒更新一次爬虫信息 - if _, err := c.AddFunc("*/5 * * * * *", UpdateSpiders); err != nil { - return err - } - - // 每60秒同步爬虫给工作节点 - if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil { - return err - } - } else { - // 非主节点 - - // 订阅文件上传 - channel := "files:upload" - - //sub.Connect() - ctx := context.Background() - return database.RedisClient.Subscribe(ctx, OnFileUpload, channel) - + if _, err := c.AddFunc("0 * * * * *", PublishAllSpiders); err != nil { + return err } - // 启动定时任务 c.Start() diff --git a/backend/services/spider_handler/spider.go b/backend/services/spider_handler/spider.go new file mode 100644 index 000000000..53c83b9a2 --- /dev/null +++ b/backend/services/spider_handler/spider.go @@ -0,0 +1,137 @@ +package spider_handler + +import ( + "crawlab/database" + "crawlab/model" + "crawlab/utils" + "github.com/apex/log" + "github.com/globalsign/mgo/bson" + "github.com/satori/go.uuid" + "github.com/spf13/viper" + "io" + "os" + "path/filepath" + "runtime/debug" +) + +const ( + Md5File = "md5.txt" +) + +type SpiderSync struct { + Spider model.Spider +} + +func (s *SpiderSync) CreateMd5File(md5 string) { + path := filepath.Join(viper.GetString("spider.path"), s.Spider.Name) + utils.CreateFilePath(path) + + fileName := filepath.Join(path, Md5File) + file := utils.OpenFile(fileName) + defer file.Close() + if file != nil { + if _, err := file.WriteString(md5 + "\n"); err != nil { + log.Errorf("file write string error: %s", err.Error()) + debug.PrintStack() + } + } +} + +// 获得下载锁的key +func (s *SpiderSync) GetLockDownloadKey(spiderId string) string { + node, _ := model.GetCurrentNode() + return node.Id.Hex() + "#" + spiderId +} + +// 删除本地文件 +func (s *SpiderSync) RemoveSpiderFile() { + path := filepath.Join( + viper.GetString("spider.path"), + s.Spider.Name, + ) + //爬虫文件有变化,先删除本地文件 + if err := os.RemoveAll(path); err != nil { + log.Errorf("remove spider files error: %s, path: %s", err.Error(), path) + debug.PrintStack() + } +} + +// 检测是否已经下载中 +func (s *SpiderSync) CheckDownLoading(spiderId string, fileId string) (bool, string) { + key := s.GetLockDownloadKey(spiderId) + if _, err := database.RedisClient.HGet("spider", key); err == nil { + return true, key + } + return false, key +} + +// 下载爬虫 +func (s *SpiderSync) Download() { + spiderId := s.Spider.Id.Hex() + fileId := s.Spider.FileId.Hex() + isDownloading, key := s.CheckDownLoading(spiderId, fileId) + if isDownloading { + return + } else { + _ = database.RedisClient.HSet("spider", key, key) + } + + session, gf := database.GetGridFs("files") + defer session.Close() + + f, err := gf.OpenId(bson.ObjectIdHex(fileId)) + defer f.Close() + if err != nil { + log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error()) + debug.PrintStack() + return + } + + // 生成唯一ID + randomId := uuid.NewV4() + tmpPath := viper.GetString("other.tmppath") + if !utils.Exists(tmpPath) { + if err := os.MkdirAll(tmpPath, 0777); err != nil { + log.Errorf("mkdir other.tmppath error: %v", err.Error()) + return + } + } + // 创建临时文件 + tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") + tmpFile := utils.OpenFile(tmpFilePath) + defer tmpFile.Close() + + // 将该文件写入临时文件 + if _, err := io.Copy(tmpFile, f); err != nil { + log.Errorf("copy file error: %s, file_id: %s", err.Error(), f.Id()) + debug.PrintStack() + return + } + + // 解压缩临时文件到目标文件夹 + dstPath := filepath.Join( + viper.GetString("spider.path"), + s.Spider.Name, + ) + if err := utils.DeCompress(tmpFile, dstPath); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + + // 关闭临时文件 + if err := tmpFile.Close(); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + + // 删除临时文件 + if err := os.Remove(tmpFilePath); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + + _ = database.RedisClient.HDel("spider", key) +} diff --git a/backend/services/spider_handler/spider_test.go b/backend/services/spider_handler/spider_test.go new file mode 100644 index 000000000..66d474551 --- /dev/null +++ b/backend/services/spider_handler/spider_test.go @@ -0,0 +1,53 @@ +package spider_handler + +import ( + "crawlab/config" + "crawlab/database" + "crawlab/model" + "github.com/apex/log" + "github.com/globalsign/mgo/bson" + "runtime/debug" + "testing" +) + +var s SpiderSync + +func init() { + if err := config.InitConfig("../../conf/config.yml"); err != nil { + log.Fatal("Init config failed") + } + log.Infof("初始化配置成功") + + // 初始化Mongodb数据库 + if err := database.InitMongo(); err != nil { + log.Error("init mongodb error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("初始化Mongodb数据库成功") + + // 初始化Redis数据库 + if err := database.InitRedis(); err != nil { + log.Error("init redis error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("初始化Redis数据库成功") + + s = SpiderSync{ + Spider: model.Spider{ + Id: bson.ObjectIdHex("5d8d8326bc3c4f000186e5df"), + Name: "scrapy-pre_sale", + FileId: bson.ObjectIdHex("5d8d8326bc3c4f000186e5db"), + Src: "/opt/crawlab/spiders/scrapy-pre_sale", + }, + } +} + +func TestSpiderSync_CreateMd5File(t *testing.T) { + s.CreateMd5File("this is md5") +} + +func TestSpiderSync_Download(t *testing.T) { + s.Download() +} diff --git a/backend/services/system.go b/backend/services/system.go index 2c7cd05a9..92f9cf962 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -3,17 +3,17 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/model" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" ) var SystemInfoChanMap = utils.NewChanMap() -func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { +func GetRemoteSystemInfo(id string) (sysInfo entity.SystemInfo, err error) { // 发送消息 - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeGetSystemInfo, NodeId: id, } @@ -21,7 +21,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { // 序列化 msgBytes, _ := json.Marshal(&msg) if _, err := database.RedisClient.Publish("nodes:"+id, utils.BytesToString(msgBytes)); err != nil { - return model.SystemInfo{}, err + return entity.SystemInfo{}, err } // 通道 @@ -38,7 +38,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { return sysInfo, nil } -func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) { +func GetSystemInfo(id string) (sysInfo entity.SystemInfo, err error) { if IsMasterNode(id) { sysInfo, err = model.GetLocalSystemInfo() } else { diff --git a/backend/services/task.go b/backend/services/task.go index 5f3a4d076..0e8db9649 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -3,9 +3,9 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "errors" @@ -139,33 +139,52 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e go func() { // 传入信号,此处阻塞 signal := <-ch - - if signal == constants.TaskCancel { + log.Infof("cancel process signal: %s", signal) + if signal == constants.TaskCancel && cmd.Process != nil { // 取消进程 if err := cmd.Process.Kill(); err != nil { - log.Errorf(err.Error()) + log.Errorf("process kill error: %s", err.Error()) debug.PrintStack() - return } t.Status = constants.StatusCancelled + } else { + // 保存任务 + t.Status = constants.StatusFinished } - - // 保存任务 t.FinishTs = time.Now() if err := t.Save(); err != nil { - log.Infof(err.Error()) + log.Infof("save task error: %s", err.Error()) debug.PrintStack() return } }() - // 开始执行 - if err := cmd.Run(); err != nil { - HandleTaskError(t, err) + // 异步启动进程 + if err := cmd.Start(); err != nil { + log.Errorf("start spider error:{}", err.Error()) + debug.PrintStack() return err } - ch <- constants.TaskFinish + // 保存pid到task + t.Pid = cmd.Process.Pid + if err := t.Save(); err != nil { + log.Errorf("save task pid error: %s", err.Error()) + debug.PrintStack() + return err + } + // 同步等待进程完成 + if err := cmd.Wait(); err != nil { + log.Errorf("wait process finish error: %s", err.Error()) + debug.PrintStack() + // 发生一次也需要保存 + t.Error = err.Error() + t.FinishTs = time.Now() + t.Status = constants.StatusFinished + _ = t.Save() + return err + } + ch <- constants.TaskFinish return nil } @@ -239,7 +258,7 @@ func ExecuteTask(id int) { tic := time.Now() // 获取当前节点 - node, err := GetCurrentNode() + node, err := model.GetCurrentNode() if err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return @@ -434,6 +453,8 @@ func CancelTask(id string) (err error) { // 获取任务 task, err := model.GetTask(id) if err != nil { + log.Errorf("task not found, task id : %s, error: %s", id, err.Error()) + debug.PrintStack() return err } @@ -443,24 +464,36 @@ func CancelTask(id string) (err error) { } // 获取当前节点(默认当前节点为主节点) - node, err := GetCurrentNode() + node, err := model.GetCurrentNode() if err != nil { + log.Errorf("get current node error: %s", err.Error()) + debug.PrintStack() return err } + log.Infof("current node id is: %s", node.Id.Hex()) + log.Infof("task node id is: %s", task.NodeId.Hex()) + if node.Id == task.NodeId { // 任务节点为主节点 // 获取任务执行频道 ch := utils.TaskExecChanMap.ChanBlocked(id) - - // 发出取消进程信号 - ch <- constants.TaskCancel + if ch != nil { + // 发出取消进程信号 + ch <- constants.TaskCancel + } else { + if err := model.UpdateTaskToAbnormal(node.Id); err != nil { + log.Errorf("update task to abnormal : {}", err.Error()) + debug.PrintStack() + return err + } + } } else { // 任务节点为工作节点 // 序列化消息 - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeCancelTask, TaskId: id, } diff --git a/backend/utils/file.go b/backend/utils/file.go index dda73c139..d65e7ab1b 100644 --- a/backend/utils/file.go +++ b/backend/utils/file.go @@ -2,6 +2,7 @@ package utils import ( "archive/zip" + "bufio" "github.com/apex/log" "io" "os" @@ -9,6 +10,49 @@ import ( "runtime/debug" ) +// 删除文件 +func RemoveFiles(path string) { + if err := os.RemoveAll(path); err != nil { + log.Errorf("remove files error: %s, path: %s", err.Error(), path) + debug.PrintStack() + } +} + +// 读取文件一行 +func ReadFileOneLine(fileName string) string { + file := OpenFile(fileName) + defer file.Close() + buf := bufio.NewReader(file) + line, err := buf.ReadString('\n') + if err != nil { + log.Errorf("read file error: %s", err.Error()) + return "" + } + return line + +} + +// 创建文件 +func OpenFile(fileName string) *os.File { + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, os.ModePerm) + if err != nil { + log.Errorf("create file error: %s, file_name: %s", err.Error(), fileName) + debug.PrintStack() + return nil + } + return file +} + +// 创建文件夹 +func CreateFilePath(filePath string) { + if !Exists(filePath) { + if err := os.MkdirAll(filePath, os.ModePerm); err != nil { + log.Errorf("create file error: %s, file_path: %s", err.Error(), filePath) + debug.PrintStack() + } + } +} + // 判断所给路径文件/文件夹是否存在 func Exists(path string) bool { _, err := os.Stat(path) //os.Stat获取文件信息 diff --git a/backend/utils/helpers.go b/backend/utils/helpers.go index 8e6de815f..edc6200ee 100644 --- a/backend/utils/helpers.go +++ b/backend/utils/helpers.go @@ -1,7 +1,55 @@ package utils -import "unsafe" +import ( + "context" + "crawlab/database" + "crawlab/entity" + "encoding/json" + "github.com/apex/log" + "github.com/gomodule/redigo/redis" + "runtime/debug" + "unsafe" +) func BytesToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } + +func GetJson(message entity.NodeMessage) string { + msgBytes, err := json.Marshal(&message) + if err != nil { + log.Errorf("node message to json error: %s", err.Error()) + debug.PrintStack() + return "" + } + return BytesToString(msgBytes) +} + +func GetMessage(message redis.Message) *entity.NodeMessage { + msg := entity.NodeMessage{} + if err := json.Unmarshal(message.Data, &msg); err != nil { + log.Errorf("message byte to object error: %s", err.Error()) + debug.PrintStack() + return nil + } + return &msg +} + +func Pub(channel string, msg entity.NodeMessage) error { + if _, err := database.RedisClient.Publish(channel, GetJson(msg)); err != nil { + log.Errorf("publish redis error: %s", err.Error()) + debug.PrintStack() + return err + } + return nil +} + +func Sub(channel string, consume database.ConsumeFunc) error { + ctx := context.Background() + if err := database.RedisClient.Subscribe(ctx, consume, channel); err != nil { + log.Errorf("subscribe redis error: %s", err.Error()) + debug.PrintStack() + return err + } + return nil +} diff --git a/frontend/package.json b/frontend/package.json index e3bc84f8a..20e40c7c4 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -3,7 +3,7 @@ "version": "0.2.3", "private": true, "scripts": { - "serve": "vue-cli-service serve --ip=0.0.0.0", + "serve": "vue-cli-service serve --ip=0.0.0.0 --mode=development", "serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0", "config": "vue ui", "build:dev": "vue-cli-service build --mode development", diff --git a/frontend/src/components/Common/DialogView.vue b/frontend/src/components/Common/DialogView.vue index 7976171e7..3c83d7ce8 100644 --- a/frontend/src/components/Common/DialogView.vue +++ b/frontend/src/components/Common/DialogView.vue @@ -151,7 +151,7 @@ export default { } }, mounted () { - if (!this.spiderList || !this.spiderList.length) this.$store.dispatch('spider/getSpiderList') + // if (!this.spiderList || !this.spiderList.length) this.$store.dispatch('spider/getSpiderList') if (!this.nodeList || !this.nodeList.length) this.$store.dispatch('node/getNodeList') } } diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index 39702a5dc..381b253c5 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -18,10 +18,10 @@ - + - + @@ -39,10 +39,11 @@ - - - - + + + + + @@ -102,16 +103,7 @@ export default { 'spiderForm' ]), isShowRun () { - if (this.isCustomized) { - // customized spider - return !!this.spiderForm.cmd - } else { - // configurable spider - return !!this.spiderForm.fields - } - }, - isCustomized () { - return this.spiderForm.type === 'customized' + return !!this.spiderForm.cmd } }, methods: { diff --git a/frontend/src/store/modules/spider.js b/frontend/src/store/modules/spider.js index b7bccd0d7..07a0bac35 100644 --- a/frontend/src/store/modules/spider.js +++ b/frontend/src/store/modules/spider.js @@ -4,6 +4,8 @@ const state = { // list of spiders spiderList: [], + spiderTotal: 0, + // active spider data spiderForm: {}, @@ -38,6 +40,9 @@ const state = { const getters = {} const mutations = { + SET_SPIDER_TOTAL (state, value) { + state.spiderTotal = value + }, SET_SPIDER_FORM (state, value) { state.spiderForm = value }, @@ -71,14 +76,11 @@ const mutations = { } const actions = { - getSpiderList ({ state, commit }) { - let params = {} - if (state.filterSite) { - params.site = state.filterSite - } + getSpiderList ({ state, commit }, params = {}) { return request.get('/spiders', params) .then(response => { - commit('SET_SPIDER_LIST', response.data.data) + commit('SET_SPIDER_LIST', response.data.data.list) + commit('SET_SPIDER_TOTAL', response.data.data.total) }) }, editSpider ({ state, dispatch }) { diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index 1d7e6c09f..bb182706f 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -139,7 +139,7 @@ const actions = { cancelTask ({ state, dispatch }, id) { return request.post(`/tasks/${id}/cancel`) .then(() => { - dispatch('getTaskData') + dispatch('getTaskData', id) }) } } diff --git a/frontend/src/views/result/ResultDetail.vue b/frontend/src/views/result/ResultDetail.vue index f42bee5c4..df8487ef8 100644 --- a/frontend/src/views/result/ResultDetail.vue +++ b/frontend/src/views/result/ResultDetail.vue @@ -59,7 +59,7 @@ export default { }, created () { // get the list of the spiders - this.$store.dispatch('spider/getSpiderList') + // this.$store.dispatch('spider/getSpiderList') // get spider basic info this.$store.dispatch('spider/getSpiderData', this.$route.params.id) diff --git a/frontend/src/views/result/ResultList.vue b/frontend/src/views/result/ResultList.vue index 85c700980..2f3e820bd 100644 --- a/frontend/src/views/result/ResultList.vue +++ b/frontend/src/views/result/ResultList.vue @@ -195,7 +195,7 @@ export default { this.dialogVisible = true }, onRefresh () { - this.$store.dispatch('spider/getSpiderList') + // this.$store.dispatch('spider/getSpiderList') }, onSubmit () { const vm = this @@ -257,7 +257,7 @@ export default { } }, created () { - this.$store.dispatch('spider/getSpiderList') + // this.$store.dispatch('spider/getSpiderList') } } diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 743a186e2..c44d46e2a 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -269,7 +269,7 @@ export default { }, created () { this.$store.dispatch('schedule/getScheduleList') - this.$store.dispatch('spider/getSpiderList') + // this.$store.dispatch('spider/getSpiderList') this.$store.dispatch('node/getNodeList') } } diff --git a/frontend/src/views/spider/SpiderDetail.vue b/frontend/src/views/spider/SpiderDetail.vue index 69fdd7705..916592f41 100644 --- a/frontend/src/views/spider/SpiderDetail.vue +++ b/frontend/src/views/spider/SpiderDetail.vue @@ -87,7 +87,7 @@ export default { }, created () { // get the list of the spiders - this.$store.dispatch('spider/getSpiderList') + // this.$store.dispatch('spider/getSpiderList') // get spider basic info this.$store.dispatch('spider/getSpiderData', this.$route.params.id) diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 6ff5cb35b..743aabbe9 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -108,19 +108,20 @@ - - - - - - - - + + + + + + + + + + + + @@ -143,7 +144,7 @@ - - {{$t('Configurable')}} - {{$t('Customized')}} + {{scope.row.type === 'customized' ? '自定义' : scope.row.type}} + :total="spiderTotal"> @@ -248,7 +248,7 @@ import { import dayjs from 'dayjs' import CrawlConfirmDialog from '../../components/Common/CrawlConfirmDialog' import StatusTag from '../../components/Status/StatusTag' - +import request from '../../api/request' export default { name: 'SpiderList', components: { @@ -258,7 +258,7 @@ export default { data () { return { pagination: { - pageNum: 0, + pageNum: 1, pageSize: 10 }, importLoading: false, @@ -271,21 +271,18 @@ export default { crawlConfirmDialogVisible: false, activeSpiderId: undefined, filter: { - keyword: '' + keyword: '', + type: '' }, + types: [], // tableData, columns: [ - { name: 'name', label: 'Name', width: '160', align: 'left' }, - // { name: 'site_name', label: 'Site', width: '140', align: 'left' }, + { name: 'display_name', label: 'Name', width: '160', align: 'left' }, { name: 'type', label: 'Spider Type', width: '120' }, - // { name: 'cmd', label: 'Command Line', width: '200' }, { name: 'last_status', label: 'Last Status', width: '120' }, { name: 'last_run_ts', label: 'Last Run', width: '140' }, - { name: 'create_ts', label: 'Create Time', width: '140' }, - { name: 'update_ts', label: 'Update Time', width: '140' }, + // { name: 'update_ts', label: 'Update Time', width: '140' }, { name: 'remark', label: 'Remark', width: '140' } - // { name: 'last_7d_tasks', label: 'Last 7-Day Tasks', width: '80' }, - // { name: 'last_5_errors', label: 'Last 5-Run Errors', width: '80' } ], spiderFormRules: { name: [{ required: true, message: 'Required Field', trigger: 'change' }] @@ -297,45 +294,28 @@ export default { ...mapState('spider', [ 'importForm', 'spiderList', - 'spiderForm' + 'spiderForm', + 'spiderTotal' ]), ...mapGetters('user', [ 'token' - ]), - filteredTableData () { - return this.spiderList - .filter(d => { - if (this.filterSite) { - return d.site === this.filterSite - } - return true - }) - .filter((d, index) => { - return (this.pagination.pageSize * (this.pagination.pageNum - 1)) <= index && (index < this.pagination.pageSize * this.pagination.pageNum) - }) - // .filter(d => { - // if (!this.filter.keyword) return true - // for (let i = 0; i < this.columns.length; i++) { - // const colName = this.columns[i].name - // if (d[colName] && d[colName].toLowerCase().indexOf(this.filter.keyword.toLowerCase()) > -1) { - // return true - // } - // } - // return false - // }) - }, - filterSite: { - get () { - return this.$store.state.spider.filterSite - }, - set (value) { - this.$store.commit('spider/SET_FILTER_SITE', value) - } - } + ]) }, methods: { - onSearch (value) { - console.log(value) + onSpiderTypeChange (val) { + this.filter.type = val + this.getList() + }, + onPageSizeChange (val) { + this.pagination.pageSize = val + this.getList() + }, + onPageNumChange (val) { + this.pagination.pageNum = val + this.getList() + }, + onSearch () { + this.getList() }, onAdd () { // this.addDialogVisible = true @@ -353,7 +333,7 @@ export default { this.$st.sendEv('爬虫', '添加爬虫-自定义爬虫') }, onRefresh () { - this.$store.dispatch('spider/getSpiderList') + this.getList() this.$st.sendEv('爬虫', '刷新') }, onSubmit () { @@ -376,9 +356,6 @@ export default { this.$store.commit('spider/SET_SPIDER_FORM', {}) this.dialogVisible = false }, - onAddCancel () { - this.addDialogVisible = false - }, onDialogClose () { this.$store.commit('spider/SET_SPIDER_FORM', {}) this.dialogVisible = false @@ -422,9 +399,6 @@ export default { this.$router.push('/spiders/' + row._id) this.$st.sendEv('爬虫', '查看') }, - onPageChange () { - this.$store.dispatch('spider/getSpiderList') - }, onImport () { this.$refs.importForm.validate(valid => { if (valid) { @@ -433,7 +407,7 @@ export default { this.$store.dispatch('spider/importGithub') .then(response => { this.$message.success('Import repo successfully') - this.$store.dispatch('spider/getSpiderList') + this.getList() }) .catch(response => { this.$message.error(response.data.error) @@ -501,7 +475,7 @@ export default { // fetch spider list setTimeout(() => { - this.$store.dispatch('spider/getSpiderList') + this.getList() }, 500) // close popup @@ -515,14 +489,26 @@ export default { if (column.label !== this.$t('Action')) { this.onView(row) } + }, + getList () { + let params = { + pageNum: this.pagination.pageNum, + pageSize: this.pagination.pageSize, + keyword: this.filter.keyword, + type: this.filter.type + } + this.$store.dispatch('spider/getSpiderList', params) + }, + getTypes () { + request.get(`/spider/types`).then(resp => { + this.types = resp.data.data + }) } }, created () { - // take site from params to filter - this.$store.commit('spider/SET_FILTER_SITE', this.$route.params.domain) - + this.getTypes() // fetch spider list - this.$store.dispatch('spider/getSpiderList') + this.getList() }, mounted () { } diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 5ad1b14fb..9db3623d0 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -4,28 +4,28 @@ - - - - - - - - {{$t('Search')}} - + + + + + + + + + + + + + + + + + + + + + +