CVE-2023-34968: mdscli: return share relative paths
[Samba.git] / python / samba / tests / dcerpc / mdssvc.py
blob5002e5d26d64926496792258c557cdd82f348a84
2 # Unix SMB/CIFS implementation.
3 # Copyright Ralph Boehme <slow@samba.org> 2019
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Tests for samba.dcerpc.mdssvc"""
21 import os
22 import time
23 import threading
24 import logging
25 import json
26 from http.server import HTTPServer, BaseHTTPRequestHandler
27 from samba.dcerpc import mdssvc
28 from samba.tests import RpcInterfaceTestCase
29 from samba.samba3 import mdscli
30 from samba.logger import get_samba_logger
32 logger = get_samba_logger(name=__name__)
34 testfiles = [
35 "foo",
36 "bar",
37 "x+x",
38 "x*x",
39 "x=x",
40 "x'x",
41 "x?x",
42 "x\"x",
43 "x\\x",
44 "x(x",
45 "x x",
48 class MdssvcHTTPRequestHandler(BaseHTTPRequestHandler):
49 def do_POST(self):
50 content_length = int(self.headers['content-length'])
51 body = self.rfile.read(content_length)
53 actual_json = json.loads((body))
54 expected_json = json.loads(self.server.json_in)
56 if actual_json != expected_json:
57 logger.error("Bad request, expected:\n%s\nGot:\n%s\n" % (expected_json, actual_json))
58 self.send_error(400,
59 "Bad request",
60 "Expected: %s\n"
61 "Got: %s\n" %
62 (expected_json, actual_json))
63 return
65 resp = bytes(self.server.json_out, encoding="utf-8")
67 self.send_response(200)
68 self.send_header('content-type', 'application/json; charset=UTF-8')
69 self.send_header('content-length', len(resp))
70 self.end_headers()
71 self.wfile.write(resp)
73 class MdssvcTests(RpcInterfaceTestCase):
75 def setUp(self):
76 super(MdssvcTests, self).setUp()
78 self.pipe = mdssvc.mdssvc('ncacn_np:fileserver[/pipe/mdssvc]', self.get_loadparm())
80 self.server = HTTPServer(('10.53.57.35', 8080),
81 MdssvcHTTPRequestHandler,
82 bind_and_activate=False)
84 self.t = threading.Thread(target=MdssvcTests.http_server, args=(self,))
85 self.t.setDaemon(True)
86 self.t.start()
87 self.sharepath = os.environ["LOCAL_PATH"]
88 time.sleep(1)
90 conn = mdscli.conn(self.pipe, 'spotlight', '/foo')
91 self.fakepath = conn.sharepath()
92 conn.disconnect(self.pipe)
94 for file in testfiles:
95 f = open("%s/%s" % (self.sharepath, file), "w")
96 f.close()
98 def tearDown(self):
99 super(RpcInterfaceTestCase, self).tearDown()
100 for file in testfiles:
101 os.remove("%s/%s" % (self.sharepath, file))
103 def http_server(self):
104 self.server.server_bind()
105 self.server.server_activate()
106 self.server.serve_forever()
108 def run_test(self, query, expect, json_in, json_out):
109 self.server.json_in = json_in.replace("%BASEPATH%", self.sharepath)
110 self.server.json_out = json_out.replace("%BASEPATH%", self.sharepath)
112 self.conn = mdscli.conn(self.pipe, 'spotlight', '/foo')
113 search = self.conn.search(self.pipe, query, self.fakepath)
115 # Give it some time, the get_results() below returns immediately
116 # what's available, so if we ask to soon, we might get back no results
117 # as the server is still processing the request
118 time.sleep(1)
120 results = search.get_results(self.pipe)
121 self.assertEqual(results, expect)
123 search.close(self.pipe)
124 self.conn.disconnect(self.pipe)
126 def test_mdscli_search(self):
127 exp_json_query = r'''{
128 "from": 0, "size": 100, "_source": ["path.real"],
129 "query": {
130 "query_string": {
131 "query": "(samba*) AND path.real.fulltext:\"%BASEPATH%\""
134 }'''
135 fake_json_response = '''{
136 "hits" : {
137 "total" : { "value" : 2},
138 "hits" : [
139 {"_source" : {"path" : {"real" : "%BASEPATH%/foo"}}},
140 {"_source" : {"path" : {"real" : "%BASEPATH%/bar"}}}
143 }'''
144 exp_results = ["foo", "bar"]
145 self.run_test('*=="samba*"', exp_results, exp_json_query, fake_json_response)
147 def test_mdscli_search_escapes(self):
148 sl_query = (
149 r'kMDItemFSName=="x+x"||'
150 r'kMDItemFSName=="x\*x"||'
151 r'kMDItemFSName=="x=x"||'
152 'kMDItemFSName=="x\'x"||'
153 r'kMDItemFSName=="x?x"||'
154 r'kMDItemFSName=="x x"||'
155 r'kMDItemFSName=="x(x"||'
156 r'kMDItemFSName=="x\"x"||'
157 r'kMDItemFSName=="x\\x"'
159 exp_json_query = r'''{
160 "from": 0, "size": 100, "_source": ["path.real"],
161 "query": {
162 "query_string": {
163 "query": "(file.filename:x\\+x OR file.filename:x\\*x OR file.filename:x=x OR file.filename:x'x OR file.filename:x\\?x OR file.filename:x\\ x OR file.filename:x\\(x OR file.filename:x\\\"x OR file.filename:x\\\\x) AND path.real.fulltext:\"%BASEPATH%\""
166 }'''
167 fake_json_response = r'''{
168 "hits" : {
169 "total" : {"value" : 2},
170 "hits" : [
171 {"_source" : {"path" : {"real" : "%BASEPATH%/x+x"}}},
172 {"_source" : {"path" : {"real" : "%BASEPATH%/x*x"}}},
173 {"_source" : {"path" : {"real" : "%BASEPATH%/x=x"}}},
174 {"_source" : {"path" : {"real" : "%BASEPATH%/x'x"}}},
175 {"_source" : {"path" : {"real" : "%BASEPATH%/x?x"}}},
176 {"_source" : {"path" : {"real" : "%BASEPATH%/x x"}}},
177 {"_source" : {"path" : {"real" : "%BASEPATH%/x(x"}}},
178 {"_source" : {"path" : {"real" : "%BASEPATH%/x\"x"}}},
179 {"_source" : {"path" : {"real" : "%BASEPATH%/x\\x"}}}
182 }'''
183 exp_results = [
184 r"x+x",
185 r"x*x",
186 r"x=x",
187 r"x'x",
188 r"x?x",
189 r"x x",
190 r"x(x",
191 "x\"x",
192 r"x\x",
194 self.run_test(sl_query, exp_results, exp_json_query, fake_json_response)