mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-04-05 20:52:50 +08:00
refactoring
This commit is contained in:
parent
d8f5985e5e
commit
f282ed444b
@ -85,11 +85,15 @@ type ChunkView struct {
|
||||
Offset int64
|
||||
Size uint64
|
||||
LogicOffset int64
|
||||
IsFullChunk bool
|
||||
ChunkSize uint64
|
||||
CipherKey []byte
|
||||
IsGzipped bool
|
||||
}
|
||||
|
||||
func (cv *ChunkView) IsFullChunk() bool {
|
||||
return cv.Size == cv.ChunkSize
|
||||
}
|
||||
|
||||
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
|
||||
|
||||
visibles := NonOverlappingVisibleIntervals(chunks)
|
||||
@ -111,13 +115,12 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
|
||||
for _, chunk := range visibles {
|
||||
|
||||
if chunk.start <= offset && offset < chunk.stop && offset < stop {
|
||||
isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
|
||||
views = append(views, &ChunkView{
|
||||
FileId: chunk.fileId,
|
||||
Offset: offset - chunk.start, // offset is the data starting location in this file id
|
||||
Size: uint64(min(chunk.stop, stop) - offset),
|
||||
LogicOffset: offset,
|
||||
IsFullChunk: isFullChunk,
|
||||
ChunkSize: chunk.chunkSize,
|
||||
CipherKey: chunk.cipherKey,
|
||||
IsGzipped: chunk.isGzipped,
|
||||
})
|
||||
@ -146,7 +149,7 @@ var bufPool = sync.Pool{
|
||||
|
||||
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
|
||||
|
||||
newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped)
|
||||
newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped)
|
||||
|
||||
length := len(visibles)
|
||||
if length == 0 {
|
||||
@ -160,11 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
|
||||
logPrintf(" before", visibles)
|
||||
for _, v := range visibles {
|
||||
if v.start < chunk.Offset && chunk.Offset < v.stop {
|
||||
newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
|
||||
newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
|
||||
}
|
||||
chunkStop := chunk.Offset + int64(chunk.Size)
|
||||
if v.start < chunkStop && chunkStop < v.stop {
|
||||
newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
|
||||
newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
|
||||
}
|
||||
if chunkStop <= v.start || v.stop <= chunk.Offset {
|
||||
newVisibles = append(newVisibles, v)
|
||||
@ -216,18 +219,18 @@ type VisibleInterval struct {
|
||||
stop int64
|
||||
modifiedTime int64
|
||||
fileId string
|
||||
isFullChunk bool
|
||||
chunkSize uint64
|
||||
cipherKey []byte
|
||||
isGzipped bool
|
||||
}
|
||||
|
||||
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval {
|
||||
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
|
||||
return VisibleInterval{
|
||||
start: start,
|
||||
stop: stop,
|
||||
fileId: fileId,
|
||||
modifiedTime: modifiedTime,
|
||||
isFullChunk: isFullChunk,
|
||||
chunkSize: chunkSize,
|
||||
cipherKey: cipherKey,
|
||||
isGzipped: isGzipped,
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
|
||||
// fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
|
||||
|
||||
hasDataInCache := false
|
||||
chunkData := c.chunkCache.GetChunk(chunkView.FileId)
|
||||
chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
|
||||
if chunkData != nil {
|
||||
glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
|
||||
hasDataInCache = true
|
||||
|
@ -31,7 +31,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
|
||||
for _, chunkView := range chunkViews {
|
||||
|
||||
urlString := fileId2Url[chunkView.FileId]
|
||||
err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
|
||||
err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
|
||||
w.Write(data)
|
||||
})
|
||||
if err != nil {
|
||||
@ -128,7 +128,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
|
||||
return err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
|
||||
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
|
||||
buffer.Write(data)
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -115,7 +115,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
||||
}
|
||||
|
||||
var writeErr error
|
||||
readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
_, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
|
||||
})
|
||||
|
||||
|
@ -103,7 +103,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
||||
}
|
||||
|
||||
var writeErr error
|
||||
readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
_, err := writer.Write(data)
|
||||
if err != nil {
|
||||
writeErr = err
|
||||
|
@ -101,7 +101,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
wc.Write(data)
|
||||
})
|
||||
|
||||
|
@ -1,52 +1,39 @@
|
||||
package chunk_cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
const (
|
||||
memCacheSizeLimit = 1024 * 1024
|
||||
)
|
||||
|
||||
// a global cache for recently accessed file chunks
|
||||
type ChunkCache struct {
|
||||
memCache *ChunkCacheInMemory
|
||||
diskCaches []*ChunkCacheVolume
|
||||
memCache *ChunkCacheInMemory
|
||||
diskCache *OnDiskCacheLayer
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache {
|
||||
c := &ChunkCache{
|
||||
memCache: NewChunkCacheInMemory(maxEntries),
|
||||
}
|
||||
|
||||
volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
|
||||
if volumeCount < segmentCount {
|
||||
volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
|
||||
}
|
||||
|
||||
for i := 0; i < volumeCount; i++ {
|
||||
fileName := path.Join(dir, fmt.Sprintf("cache_%d", i))
|
||||
diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to add cache %s : %v", fileName, err)
|
||||
} else {
|
||||
c.diskCaches = append(c.diskCaches, diskCache)
|
||||
}
|
||||
c := &ChunkCache{
|
||||
memCache: NewChunkCacheInMemory(maxEntries),
|
||||
diskCache: NewOnDiskCacheLayer(dir, "cache", volumeCount, volumeSize),
|
||||
}
|
||||
|
||||
// keep newest cache to the front
|
||||
sort.Slice(c.diskCaches, func(i, j int) bool {
|
||||
return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
|
||||
})
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
|
||||
func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
@ -54,12 +41,15 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
return c.doGetChunk(fileId)
|
||||
return c.doGetChunk(fileId, chunkSize)
|
||||
}
|
||||
|
||||
func (c *ChunkCache) doGetChunk(fileId string) (data []byte) {
|
||||
if data = c.memCache.GetChunk(fileId); data != nil {
|
||||
return data
|
||||
func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
|
||||
|
||||
if chunkSize < memCacheSizeLimit {
|
||||
if data = c.memCache.GetChunk(fileId); data != nil {
|
||||
return data
|
||||
}
|
||||
}
|
||||
|
||||
fid, err := needle.ParseFileIdFromString(fileId)
|
||||
@ -67,20 +57,9 @@ func (c *ChunkCache) doGetChunk(fileId string) (data []byte) {
|
||||
glog.Errorf("failed to parse file id %s", fileId)
|
||||
return nil
|
||||
}
|
||||
for _, diskCache := range c.diskCaches {
|
||||
data, err = diskCache.GetNeedle(fid.Key)
|
||||
if err == storage.ErrorNotFound {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId)
|
||||
continue
|
||||
}
|
||||
if len(data) != 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return c.diskCache.getChunk(fid.Key)
|
||||
|
||||
}
|
||||
|
||||
func (c *ChunkCache) SetChunk(fileId string, data []byte) {
|
||||
@ -95,22 +74,8 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) {
|
||||
|
||||
func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
|
||||
|
||||
c.memCache.SetChunk(fileId, data)
|
||||
|
||||
if len(c.diskCaches) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
|
||||
t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
|
||||
if resetErr != nil {
|
||||
glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
|
||||
return
|
||||
}
|
||||
for i := len(c.diskCaches) - 1; i > 0; i-- {
|
||||
c.diskCaches[i] = c.diskCaches[i-1]
|
||||
}
|
||||
c.diskCaches[0] = t
|
||||
if len(data) < memCacheSizeLimit {
|
||||
c.memCache.SetChunk(fileId, data)
|
||||
}
|
||||
|
||||
fid, err := needle.ParseFileIdFromString(fileId)
|
||||
@ -118,7 +83,8 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
|
||||
glog.Errorf("failed to parse file id %s", fileId)
|
||||
return
|
||||
}
|
||||
c.diskCaches[0].WriteNeedle(fid.Key, data)
|
||||
|
||||
c.diskCache.setChunk(fid.Key, data)
|
||||
|
||||
}
|
||||
|
||||
@ -128,7 +94,5 @@ func (c *ChunkCache) Shutdown() {
|
||||
}
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, diskCache := range c.diskCaches {
|
||||
diskCache.Shutdown()
|
||||
}
|
||||
c.diskCache.shutdown()
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ func TestOnDisk(t *testing.T) {
|
||||
type test_data struct {
|
||||
data []byte
|
||||
fileId string
|
||||
size uint64
|
||||
}
|
||||
testData := make([]*test_data, writeCount)
|
||||
for i := 0; i < writeCount; i++ {
|
||||
@ -31,12 +32,13 @@ func TestOnDisk(t *testing.T) {
|
||||
testData[i] = &test_data{
|
||||
data: buff,
|
||||
fileId: fmt.Sprintf("1,%daabbccdd", i+1),
|
||||
size: uint64(len(buff)),
|
||||
}
|
||||
cache.SetChunk(testData[i].fileId, testData[i].data)
|
||||
}
|
||||
|
||||
for i := 0; i < writeCount; i++ {
|
||||
data := cache.GetChunk(testData[i].fileId)
|
||||
data := cache.GetChunk(testData[i].fileId, testData[i].size)
|
||||
if bytes.Compare(data, testData[i].data) != 0 {
|
||||
t.Errorf("failed to write to and read from cache: %d", i)
|
||||
}
|
||||
@ -47,7 +49,7 @@ func TestOnDisk(t *testing.T) {
|
||||
cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount)
|
||||
|
||||
for i := 0; i < writeCount; i++ {
|
||||
data := cache.GetChunk(testData[i].fileId)
|
||||
data := cache.GetChunk(testData[i].fileId, testData[i].size)
|
||||
if bytes.Compare(data, testData[i].data) != 0 {
|
||||
t.Errorf("failed to write to and read from cache: %d", i)
|
||||
}
|
||||
|
83
weed/util/chunk_cache/on_disk_cache_layer.go
Normal file
83
weed/util/chunk_cache/on_disk_cache_layer.go
Normal file
@ -0,0 +1,83 @@
|
||||
package chunk_cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
type OnDiskCacheLayer struct {
|
||||
diskCaches []*ChunkCacheVolume
|
||||
}
|
||||
|
||||
func NewOnDiskCacheLayer(dir, namePrefix string, volumeCount int, volumeSize int64) *OnDiskCacheLayer{
|
||||
c := &OnDiskCacheLayer{}
|
||||
for i := 0; i < volumeCount; i++ {
|
||||
fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i))
|
||||
diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to add cache %s : %v", fileName, err)
|
||||
} else {
|
||||
c.diskCaches = append(c.diskCaches, diskCache)
|
||||
}
|
||||
}
|
||||
|
||||
// keep newest cache to the front
|
||||
sort.Slice(c.diskCaches, func(i, j int) bool {
|
||||
return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
|
||||
})
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) {
|
||||
|
||||
if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
|
||||
t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
|
||||
if resetErr != nil {
|
||||
glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
|
||||
return
|
||||
}
|
||||
for i := len(c.diskCaches) - 1; i > 0; i-- {
|
||||
c.diskCaches[i] = c.diskCaches[i-1]
|
||||
}
|
||||
c.diskCaches[0] = t
|
||||
}
|
||||
|
||||
c.diskCaches[0].WriteNeedle(needleId, data)
|
||||
|
||||
}
|
||||
|
||||
func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte){
|
||||
|
||||
var err error
|
||||
|
||||
for _, diskCache := range c.diskCaches {
|
||||
data, err = diskCache.GetNeedle(needleId)
|
||||
if err == storage.ErrorNotFound {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId)
|
||||
continue
|
||||
}
|
||||
if len(data) != 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (c *OnDiskCacheLayer) shutdown(){
|
||||
|
||||
for _, diskCache := range c.diskCaches {
|
||||
diskCache.Shutdown()
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user