From 0429d8465a488d1d88bac35b1a4fa927d99c0284 Mon Sep 17 00:00:00 2001 From: Sam Liddicott Date: Fri, 7 Nov 2008 15:39:58 +0000 Subject: [PATCH] Basic validate-on-read try and validate cached file in large chunks of 10M or more No pipelining on server side yet. --- source4/ntvfs/proxy/vfs_proxy.c | 260 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 258 insertions(+), 2 deletions(-) diff --git a/source4/ntvfs/proxy/vfs_proxy.c b/source4/ntvfs/proxy/vfs_proxy.c index 3440802f849..5f9399cc4f2 100644 --- a/source4/ntvfs/proxy/vfs_proxy.c +++ b/source4/ntvfs/proxy/vfs_proxy.c @@ -910,9 +910,12 @@ NTSTATUS async_read_cache_save(struct async_info *async, void* io1, void* io2, N DEBUG(3,("%s megavalidate suceeded, validate to %lld\n",__FUNCTION__, io->generic.in.offset + io->generic.out.nread)); cache_handle_validated(f, io->generic.in.offset + io->generic.out.nread); - } else cache_handle_save(f, io->generic.out.data, + } else { + DEBUG(5,("Not a mega-validate, save %d in cache\n",io->generic.out.nread)); + cache_handle_save(f, io->generic.out.data, io->generic.out.nread, io->generic.in.offset); + } DEBUG(3,("%s finished %s\n",__FUNCTION__, get_friendly_nt_error_msg(status))); return status; @@ -1228,6 +1231,245 @@ DEBUG(5,("D\n")); return NT_STATUS_OK; } +struct proxy_validate_parts_parts { + struct proxy_Read* r; + struct ntvfs_request *req; + struct proxy_file *f; + struct async_read_fragments *fragments; + off_t offset; + ssize_t remaining; + bool complete; + declare_checksum(digest); + struct MD5Context context; +}; + +NTSTATUS proxy_validate_complete(struct proxy_validate_parts_parts *parts); +NTSTATUS async_proxy_validate_parts(struct async_info *async, void* io1, void* io2, NTSTATUS status); +static NTSTATUS proxy_validate_parts(struct ntvfs_module_context *ntvfs, + struct proxy_validate_parts_parts *parts); + +/* this will be the new struct proxy_Read based read function, for now + it just deals with non-cached based validate to a regular server */ +static NTSTATUS proxy_validate(struct ntvfs_module_context *ntvfs, + struct ntvfs_request *req, + struct proxy_Read *r, + union smb_handle *file) +{ + struct proxy_private *private = ntvfs->private_data; + struct proxy_validate_parts_parts *parts; + struct async_read_fragments *fragments; + struct proxy_file *f; + NTSTATUS status; + + f = ntvfs_handle_get_backend_data(file->ntvfs, ntvfs); + if (!f) return NT_STATUS_INVALID_HANDLE; + r->in.fnum = f->fnum; + + DEBUG(5,("%s: fnum=%d\n",__FUNCTION__,f->fnum)); + + parts = talloc_zero(req, struct proxy_validate_parts_parts); + DEBUG(5,("%s: parts=%p\n",__FUNCTION__,parts)); + NT_STATUS_HAVE_NO_MEMORY(parts); + + fragments = talloc_zero(parts, struct async_read_fragments); + NT_STATUS_HAVE_NO_MEMORY(fragments); + + parts->fragments=fragments; + + parts->r=r; + parts->f=f; + parts->req=req; + /* processed offset */ + parts->offset=r->in.offset; + parts->remaining=r->in.maxcnt; + fragments->async=true; + + MD5Init (&parts->context); + + /* start a read-loop which will continue in the callback until it is + all done */ + status=proxy_validate_parts(ntvfs, parts); + if (parts->complete) { + /* Make sure we are not async */ + DEBUG(5,("%s: completed EARLY\n",__FUNCTION__)); + return proxy_validate_complete(parts); + } + + /* Assert if status!=NT_STATUS_OK then parts->complete==true */ + req->async_states->state |= NTVFS_ASYNC_STATE_ASYNC; + DEBUG(5,("%s: returning ASYNC\n",__FUNCTION__)); + return status; +} + +NTSTATUS proxy_validate_complete(struct proxy_validate_parts_parts *parts) +{ + NTSTATUS status; + struct proxy_Read* r=parts->r; + MD5Final(parts->digest, &parts->context); + + status = parts->fragments->status; + r->out.result = status; + r->out.response.generic.count=r->out.nread; + + DEBUG(5,("%s: %s nread=%d\n",__FUNCTION__, get_friendly_nt_error_msg (status), + r->out.response.generic.count)); + + DEBUG(5,("Anticipated validated digest for size: %lld\n", (long long) r->in.maxcnt)); + dump_data (5, r->in.digest.digest, sizeof(parts->digest)); + DEBUG(5,("read digest for size %lld\n",(long long) parts->offset)); + dump_data (5, parts->digest, sizeof(parts->digest)); + + if (NT_STATUS_IS_OK(status) && + (memcmp(parts->digest, r->in.digest.digest, sizeof(parts->digest))==0)) { + r->out.flags = PROXY_USE_CACHE | PROXY_VALIDATE; + DEBUG(5,("======= VALIDATED FINE \n\n\n")); + } else if (r->in.flags & PROXY_USE_ZLIB) { + ssize_t size = r->out.response.generic.count; + DEBUG(5,("======= VALIDATED WRONG \n\n\n")); + if (compress_block(r->out.response.generic.data, &size) ) { + r->out.flags|=PROXY_USE_ZLIB; + r->out.response.compress.count=size; + r->out.response.compress.data=r->out.response.generic.data; + DEBUG(3,("%s: Compressed from %d to %d = %d%%\n", + __FUNCTION__,r->out.nread,size,size*100/r->out.nread)); + } + } + + /* assert: this must only be true if we are in a callback */ + if (parts->req->async_states->state & NTVFS_ASYNC_STATE_ASYNC) { + /* we are async complete, we need to call the sendfn */ + parts->req->async_states->status=status; + DEBUG(5,("Fragments async response sending\n")); + + parts->req->async_states->send_fn(parts->req); + return NT_STATUS_OK; + } + return status; +} + +NTSTATUS async_proxy_validate_parts(struct async_info *async, void* io1, void* io2, NTSTATUS status) +{ + struct smbcli_request *c_req = async->c_req; + struct ntvfs_request *req = async->req; + struct proxy_file *f = async->f; + struct ntvfs_module_context *ntvfs = async->proxy->ntvfs; + struct async_read_fragment* fragment=talloc_get_type_abort(io2, struct async_read_fragment); + /* this is the io against which the fragment is to be applied */ + struct proxy_validate_parts_parts *parts = talloc_get_type_abort(io1, struct proxy_validate_parts_parts); + struct proxy_Read* r=parts->r; + /* this is the io for the read that issued the callback */ + union smb_read *io_frag = fragment->io_frag; + struct async_read_fragments* fragments=fragment->fragments; + + DEBUG(5,("%s: parts=%p c_req=%p io_frag=%p\n",__FUNCTION__,parts, c_req, io_frag)); + /* if request is not already received by a chained handler, read it */ + if (c_req) status=smb_raw_read_recv(c_req, io_frag); + DEBUG(5,("%s: status %s\n",__FUNCTION__,get_friendly_nt_error_msg (status))); + + fragment->status=status; + + if (NT_STATUS_IS_OK(status)) { + /* TODO: If we are not sequentially "next" the queue until we can do it */ + /* log this data in r->out.generic.data */ + ssize_t extent = io_frag->generic.in.offset + io_frag->generic.out.nread; + /* Find memcpy window, copy data from the io_frag to the io */ + off_t start_offset=MAX(io_frag->generic.in.offset, r->in.offset); + /* Don't want to go past mincnt */ + off_t io_extent=r->in.offset + r->in.mincnt; + off_t end_offset=MIN(io_extent, extent); + + /* ASSERT(start_offset <= end_offset) */ + /* ASSERT(start_offset <= io_extent) */ + if (! (start_offset >= io_extent)) { + uint8_t* dst=r->out.response.generic.data + (start_offset - r->in.offset); + uint8_t* src=io_frag->generic.out.data+(start_offset - io_frag->generic.in.offset); + /* src == dst in cases where we did not latch onto someone elses + read, but are handling our own */ + if (src != dst) + memcpy(dst, src, end_offset - start_offset); + r->out.nread=end_offset - r->in.offset; + } + + MD5Update(&parts->context, io_frag->generic.out.data, + io_frag->generic.out.nread); + + parts->fragments->status=status; + status=proxy_validate_parts(ntvfs, parts); + } else { + parts->fragments->status=status; + } + + DLIST_REMOVE(fragments->fragments, fragment); + /* this will free the io_frag too */ + talloc_free(fragment); + + if (parts->complete || NT_STATUS_IS_ERR(status)) { + /* this will call sendfn, the chain handler won't know... but + should have no more handlers queued */ + return proxy_validate_complete(parts); + } + + return NT_STATUS_OK; +} + +/* continue a read loop, possibly from a callback */ +static NTSTATUS proxy_validate_parts(struct ntvfs_module_context *ntvfs, + struct proxy_validate_parts_parts *parts) +{ + struct proxy_private *private = ntvfs->private_data; + union smb_read *io_frag; + struct async_read_fragment *fragment; + struct smbcli_request *c_req = NULL; + ssize_t size=private->tree->session->transport->negotiate.max_xmit \ + - (MIN_SMB_SIZE+32); + + /* Have we already read enough? */ + if (parts->offset >= (parts->r->in.offset + parts->r->in.maxcnt)) { + parts->complete=true; + return NT_STATUS_OK; + } + + size=MIN(size, parts->remaining); + + fragment=talloc_zero(parts->fragments, struct async_read_fragment); + NT_STATUS_HAVE_NO_MEMORY(fragment); + + io_frag = talloc_zero(fragment, union smb_read); + NT_STATUS_HAVE_NO_MEMORY(io_frag); + + io_frag->generic.out.data = talloc_size(io_frag, size); + NT_STATUS_HAVE_NO_MEMORY(io_frag->generic.out.data); + + io_frag->generic.level = RAW_READ_GENERIC; + io_frag->generic.in.file.fnum = parts->r->in.fnum; + io_frag->generic.in.offset = parts->offset; + io_frag->generic.in.mincnt = size; + io_frag->generic.in.maxcnt = size; + io_frag->generic.in.remaining = 0; +#warning maybe true is more permissive? + io_frag->generic.in.read_for_execute = false; + + //c_req = smb_raw_read_send(ntvfs, io_frag, parts->f, parts->r); + c_req = smb_raw_read_send(private->tree, io_frag); + NT_STATUS_HAVE_NO_MEMORY(c_req); + + parts->offset+=size; + parts->remaining-=size; + fragment->c_req = c_req; + fragment->io_frag = io_frag; + fragment->fragments=parts->fragments; + DLIST_ADD(parts->fragments->fragments, fragment); + + { void* req=NULL; + ADD_ASYNC_RECV_TAIL(c_req, parts, fragment, parts->f, async_proxy_validate_parts, NT_STATUS_INTERNAL_ERROR); + ASYNC_RECV_TAIL_F_ORPHAN(io_frag, async_read_handler, parts->f, c_req->async.private, NT_STATUS_UNSUCCESSFUL); + } + + DEBUG(5,("%s: issued read parts=%p c_req=%p io_frag=%p\n",__FUNCTION__,parts, c_req, io_frag)); + + return NT_STATUS_OK; +} + /* read from a file */ @@ -1395,7 +1637,7 @@ static NTSTATUS proxy_read(struct ntvfs_module_context *ntvfs, io->generic.in.mincnt == io->generic.in.maxcnt is to make sure we don't do a validate on a receive validate read */ - if (PROXY_REMOTE_SERVER(private) && + if (private->cache_validatesize && PROXY_REMOTE_SERVER(private) && next_offset >= limit && (f->cache && f->cache->status & CACHE_VALIDATE)) { ssize_t length=private->cache_validatesize; declare_checksum(digest); @@ -1410,6 +1652,8 @@ static NTSTATUS proxy_read(struct ntvfs_module_context *ntvfs, /* upgrade the read, allocate the proxy_read struct here and fill in the extras, no more out-of-band stuff */ DEBUG(5,("%s: Promoting to validate read: %lld\n",__FUNCTION__,(long long) length)); + dump_data (5, digest, sizeof(digest)); + r=talloc_zero(io_frag, struct proxy_Read); memcpy(r->in.digest.digest, digest, sizeof(digest)); r->in.flags |= PROXY_VALIDATE | PROXY_USE_CACHE; @@ -2261,6 +2505,8 @@ static NTSTATUS rpclite_proxy_Read(struct ntvfs_module_context *ntvfs, struct proxy_private *private = ntvfs->private_data; union smb_read* io=talloc(req, union smb_read); NTSTATUS status; + + NT_STATUS_HAVE_NO_MEMORY(io); /* if next hop is a proxy just repeat this call also handle VALIDATE check that means have own callback handlers too... */ SETUP_PID; @@ -2270,10 +2516,19 @@ static NTSTATUS rpclite_proxy_Read(struct ntvfs_module_context *ntvfs, DEBUG(5,("Anticipated digest\n")); dump_data (5, r->in.digest.digest, sizeof(r->in.digest.digest)); +/* If the remove end is a proxy, jusr fixup file handle and passthrough, + but update cache on the way back + if (PROXY_REMOTE_SERVER(private) && (r->in.flags & PROXY_VALIDATE)) { + } +*/ /* prepare for response */ r->out.response.generic.data=talloc_array(io, uint8_t, r->in.maxcnt); NT_STATUS_HAVE_NO_MEMORY(r->out.response.generic.data); + if (! PROXY_REMOTE_SERVER(private) && (r->in.flags & PROXY_VALIDATE)) { + return proxy_validate(ntvfs, req, r, &file); + } + /* pack up an smb_read request and dispatch here */ io->readx.level=RAW_READ_READX; io->readx.in.file=file; @@ -2825,6 +3080,7 @@ NTSTATUS async_proxy_smb_raw_read_rpc(struct async_info *async, if (r->in.flags & PROXY_VALIDATE) { DEBUG(5,("Cached data did not validate, flags: %x\n",r->out.flags)); /* turn off validate on this file */ + //cache_handle_novalidate(f); #warning turn off validate on this file - do an nread