OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python2.7
/
dist-packages
/
samba
/
tests
Server IP: 10.0.0.4
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
02/03/2022 06:37:41 AM
rwxr-xr-x
📄
__init__.py
14.23 KB
11/15/2017 07:42:13 AM
rw-r--r--
📄
__init__.pyc
17.38 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth.py
2.36 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
auth.pyc
2.47 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log.py
56.47 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
auth_log.pyc
40.28 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log_base.py
4.18 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
auth_log_base.pyc
4.1 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log_ncalrpc.py
4.07 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
auth_log_ncalrpc.pyc
3.79 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log_netlogon.py
5.05 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
auth_log_netlogon.pyc
4.56 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log_netlogon_bad_creds.py
7.06 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
auth_log_netlogon_bad_creds.pyc
6.55 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log_pass_change.py
12.91 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
auth_log_pass_change.pyc
10.67 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
auth_log_samlogon.py
6.66 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
auth_log_samlogon.pyc
6.19 KB
02/03/2022 06:37:41 AM
rw-r--r--
📁
blackbox
-
02/03/2022 06:37:41 AM
rwxr-xr-x
📄
common.py
3.04 KB
02/07/2018 08:37:51 AM
rw-r--r--
📄
common.pyc
2.86 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
core.py
2.7 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
core.pyc
3.77 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
credentials.py
19.83 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
credentials.pyc
19.06 KB
02/03/2022 06:37:41 AM
rw-r--r--
📁
dcerpc
-
02/03/2022 06:37:41 AM
rwxr-xr-x
📄
dns.py
51.4 KB
01/25/2022 03:20:03 PM
rw-r--r--
📄
dns.pyc
36.48 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
dns_base.py
13.99 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
dns_base.pyc
13.28 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
dns_forwarder.py
21.23 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
dns_forwarder.pyc
20.21 KB
02/03/2022 06:37:41 AM
rw-r--r--
📁
dns_forwarder_helpers
-
02/03/2022 06:37:41 AM
rwxr-xr-x
📄
dns_packet.py
6.6 KB
01/25/2022 03:20:03 PM
rw-r--r--
📄
dns_packet.pyc
6.97 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
dns_tkey.py
7.24 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
dns_tkey.pyc
6.13 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
dns_wildcard.py
10.9 KB
11/02/2017 11:38:36 AM
rw-r--r--
📄
dns_wildcard.pyc
8.4 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
docs.py
13.95 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
docs.pyc
11.37 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
dsdb.py
18.84 KB
08/15/2017 07:16:59 AM
rw-r--r--
📄
dsdb.pyc
14.93 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
dsdb_schema_attributes.py
7.93 KB
11/02/2017 11:38:36 AM
rw-r--r--
📄
dsdb_schema_attributes.pyc
6.96 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
gensec.py
7.85 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
gensec.pyc
6.61 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
get_opt.py
1.86 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
get_opt.pyc
1.75 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
glue.py
2.59 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
glue.pyc
3.43 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
hostconfig.py
2.15 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
hostconfig.pyc
3.45 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
join.py
6.5 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
join.pyc
5.45 KB
02/03/2022 06:37:41 AM
rw-r--r--
📁
kcc
-
02/03/2022 06:37:41 AM
rwxr-xr-x
📄
libsmb_samba_internal.py
2.38 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
libsmb_samba_internal.pyc
2.71 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
lsa_string.py
2.52 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
lsa_string.pyc
2.21 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
messaging.py
4.97 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
messaging.pyc
5.37 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
net_join.py
2.29 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
net_join.pyc
2.22 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
net_join_no_spnego.py
3.34 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
net_join_no_spnego.pyc
3.15 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
netcmd.py
3 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
netcmd.pyc
3.95 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
netlogonsvc.py
2.43 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
netlogonsvc.pyc
2.14 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
ntacls.py
4.09 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
ntacls.pyc
4.69 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
ntlmauth.py
3 KB
08/29/2017 04:12:36 AM
rw-r--r--
📄
ntlmauth.pyc
2.73 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
pam_winbind.py
1.67 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
pam_winbind.pyc
1.43 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
param.py
3.59 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
param.pyc
4.8 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
password_hash.py
12.44 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
password_hash.pyc
7.84 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
password_hash_fl2003.py
7.38 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
password_hash_fl2003.pyc
5.48 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
password_hash_fl2008.py
7.94 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
password_hash_fl2008.pyc
5.66 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
password_hash_gpgme.py
8.78 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
password_hash_gpgme.pyc
6.41 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
password_hash_ldap.py
4.85 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
password_hash_ldap.pyc
4.63 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
policy.py
1.15 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
policy.pyc
1.03 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
posixacl.py
37.62 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
posixacl.pyc
26.85 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
provision.py
6.22 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
provision.pyc
9.79 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
py_credentials.py
13.71 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
py_credentials.pyc
10.64 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
registry.py
1.73 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
registry.pyc
2.37 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
samba3.py
8.24 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
samba3.pyc
11.54 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
samba3sam.py
48.33 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
samba3sam.pyc
31.65 KB
02/03/2022 06:37:41 AM
rw-r--r--
📁
samba_tool
-
02/03/2022 06:37:41 AM
rwxr-xr-x
📄
samdb.py
3.51 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
samdb.pyc
3.11 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
security.py
5.36 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
security.pyc
7.81 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
source.py
8.06 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
source.pyc
7.48 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
strings.py
4.12 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
strings.pyc
2.96 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
subunitrun.py
2.33 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
subunitrun.pyc
1.84 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
unicodenames.py
1.07 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
unicodenames.pyc
555 bytes
02/03/2022 06:37:41 AM
rw-r--r--
📄
upgrade.py
1.36 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
upgrade.pyc
1.28 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
upgradeprovision.py
6.66 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
upgradeprovision.pyc
6.57 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
upgradeprovisionneeddc.py
7.29 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
upgradeprovisionneeddc.pyc
8.08 KB
02/03/2022 06:37:41 AM
rw-r--r--
📄
xattr.py
4.11 KB
07/04/2017 10:05:25 AM
rw-r--r--
📄
xattr.pyc
4.57 KB
02/03/2022 06:37:41 AM
rw-r--r--
Editing: dsdb.py
Close
# Unix SMB/CIFS implementation. Tests for dsdb # Copyright (C) Matthieu Patou <mat@matws.net> 2010 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # """Tests for samba.dsdb.""" from samba.credentials import Credentials from samba.samdb import SamDB from samba.auth import system_session from samba.tests import TestCase from samba.ndr import ndr_unpack, ndr_pack from samba.dcerpc import drsblobs from samba import dsdb import ldb import os import samba import gc import time class DsdbTests(TestCase): def setUp(self): super(DsdbTests, self).setUp() self.lp = samba.tests.env_loadparm() self.creds = Credentials() self.creds.guess(self.lp) self.session = system_session() self.samdb = SamDB(session_info=self.session, credentials=self.creds, lp=self.lp) def test_get_oid_from_attrid(self): oid = self.samdb.get_oid_from_attid(591614) self.assertEquals(oid, "1.2.840.113556.1.4.1790") def test_error_replpropertymetadata(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) ctr = repl.ctr for o in ctr.array: # Search for Description if o.attid == 13: old_version = o.version o.version = o.version + 1 replBlob = ndr_pack(repl) msg = ldb.Message() msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_error_replpropertymetadata_nochange(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) replBlob = ndr_pack(repl) msg = ldb.Message() msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_error_replpropertymetadata_allow_sort(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) replBlob = ndr_pack(repl) msg = ldb.Message() msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"]) def test_twoatt_replpropertymetadata(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData", "uSNChanged"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) ctr = repl.ctr for o in ctr.array: # Search for Description if o.attid == 13: old_version = o.version o.version = o.version + 1 o.local_usn = long(str(res[0]["uSNChanged"])) + 1 replBlob = ndr_pack(repl) msg = ldb.Message() msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description") self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_set_replpropertymetadata(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData", "uSNChanged"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) ctr = repl.ctr for o in ctr.array: # Search for Description if o.attid == 13: old_version = o.version o.version = o.version + 1 o.local_usn = long(str(res[0]["uSNChanged"])) + 1 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1 replBlob = ndr_pack(repl) msg = ldb.Message() msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_ok_get_attribute_from_attid(self): self.assertEquals(self.samdb.get_attribute_from_attid(13), "description") def test_ko_get_attribute_from_attid(self): self.assertEquals(self.samdb.get_attribute_from_attid(11979), None) def test_get_attribute_replmetadata_version(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["dn"]) self.assertEquals(len(res), 1) dn = str(res[0].dn) self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1) def test_set_attribute_replmetadata_version(self): res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["dn"]) self.assertEquals(len(res), 1) dn = str(res[0].dn) version = self.samdb.get_attribute_replmetadata_version(dn, "description") self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2) self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2) def test_db_lock1(self): basedn = self.samdb.get_default_basedn() (r1, w1) = os.pipe() pid = os.fork() if pid == 0: # In the child, close the main DB, re-open just one DB del(self.samdb) gc.collect() self.samdb = SamDB(session_info=self.session, credentials=self.creds, lp=self.lp) self.samdb.transaction_start() dn = "cn=test_db_lock_user,cn=users," + str(basedn) self.samdb.add({ "dn": dn, "objectclass": "user", }) self.samdb.delete(dn) # Obtain a write lock self.samdb.transaction_prepare_commit() os.write(w1, b"prepared") time.sleep(2) # Drop the write lock self.samdb.transaction_cancel() os._exit(0) self.assertEqual(os.read(r1, 8), b"prepared") start = time.time() # We need to hold this iterator open to hold the all-record lock. res = self.samdb.search_iterator() # This should take at least 2 seconds because the transaction # has a write lock on one backend db open # Release the locks for l in res: pass end = time.time() self.assertGreater(end - start, 1.9) (got_pid, status) = os.waitpid(pid, 0) self.assertEqual(got_pid, pid) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 0) def test_db_lock2(self): basedn = self.samdb.get_default_basedn() (r1, w1) = os.pipe() (r2, w2) = os.pipe() pid = os.fork() if pid == 0: # In the child, close the main DB, re-open del(self.samdb) gc.collect() self.samdb = SamDB(session_info=self.session, credentials=self.creds, lp=self.lp) # We need to hold this iterator open to hold the all-record lock. res = self.samdb.search_iterator() os.write(w2, b"start") if (os.read(r1, 7) != b"started"): os._exit(1) os.write(w2, b"add") if (os.read(r1, 5) != b"added"): os._exit(2) # Wait 2 seconds to block prepare_commit() in the child. os.write(w2, b"prepare") time.sleep(2) # Release the locks for l in res: pass if (os.read(r1, 8) != b"prepared"): os._exit(3) os._exit(0) # We can start the transaction during the search # because both just grab the all-record read lock. self.assertEqual(os.read(r2, 5), b"start") self.samdb.transaction_start() os.write(w1, b"started") self.assertEqual(os.read(r2, 3), b"add") dn = "cn=test_db_lock_user,cn=users," + str(basedn) self.samdb.add({ "dn": dn, "objectclass": "user", }) self.samdb.delete(dn) os.write(w1, b"added") # Obtain a write lock, this will block until # the parent releases the read lock. self.assertEqual(os.read(r2, 7), b"prepare") start = time.time() self.samdb.transaction_prepare_commit() end = time.time() try: self.assertGreater(end - start, 1.9) except: raise finally: os.write(w1, b"prepared") # Drop the write lock self.samdb.transaction_cancel() (got_pid, status) = os.waitpid(pid, 0) self.assertEqual(got_pid, pid) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 0) def test_db_lock3(self): basedn = self.samdb.get_default_basedn() (r1, w1) = os.pipe() (r2, w2) = os.pipe() pid = os.fork() if pid == 0: # In the child, close the main DB, re-open del(self.samdb) gc.collect() self.samdb = SamDB(session_info=self.session, credentials=self.creds, lp=self.lp) # We need to hold this iterator open to hold the all-record lock. res = self.samdb.search_iterator() os.write(w2, b"start") if (os.read(r1, 7) != b"started"): os._exit(1) os.write(w2, b"add") if (os.read(r1, 5) != b"added"): os._exit(2) # Wait 2 seconds to block prepare_commit() in the child. os.write(w2, b"prepare") time.sleep(2) # Release the locks for l in res: pass if (os.read(r1, 8) != b"prepared"): os._exit(3) os._exit(0) # We can start the transaction during the search # because both just grab the all-record read lock. self.assertEqual(os.read(r2, 5), b"start") self.samdb.transaction_start() os.write(w1, b"started") self.assertEqual(os.read(r2, 3), b"add") # This will end up in the top level db dn = "@DSDB_LOCK_TEST" self.samdb.add({ "dn": dn}) self.samdb.delete(dn) os.write(w1, b"added") # Obtain a write lock, this will block until # the child releases the read lock. self.assertEqual(os.read(r2, 7), b"prepare") start = time.time() self.samdb.transaction_prepare_commit() end = time.time() self.assertGreater(end - start, 1.9) os.write(w1, b"prepared") # Drop the write lock self.samdb.transaction_cancel() (got_pid, status) = os.waitpid(pid, 0) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 0) self.assertEqual(got_pid, pid) def _test_full_db_lock1(self, backend_path): (r1, w1) = os.pipe() pid = os.fork() if pid == 0: # In the child, close the main DB, re-open just one DB del(self.samdb) gc.collect() backenddb = ldb.Ldb(backend_path) backenddb.transaction_start() backenddb.add({"dn":"@DSDB_LOCK_TEST"}) backenddb.delete("@DSDB_LOCK_TEST") # Obtain a write lock backenddb.transaction_prepare_commit() os.write(w1, b"prepared") time.sleep(2) # Drop the write lock backenddb.transaction_cancel() os._exit(0) self.assertEqual(os.read(r1, 8), b"prepared") start = time.time() # We need to hold this iterator open to hold the all-record lock. res = self.samdb.search_iterator() # This should take at least 2 seconds because the transaction # has a write lock on one backend db open end = time.time() self.assertGreater(end - start, 1.9) # Release the locks for l in res: pass (got_pid, status) = os.waitpid(pid, 0) self.assertEqual(got_pid, pid) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 0) def test_full_db_lock1(self): basedn = self.samdb.get_default_basedn() backend_filename = "%s.ldb" % basedn.get_casefold() backend_subpath = os.path.join("sam.ldb.d", backend_filename) backend_path = self.lp.private_path(backend_subpath) self._test_full_db_lock1(backend_path) def test_full_db_lock1_config(self): basedn = self.samdb.get_config_basedn() backend_filename = "%s.ldb" % basedn.get_casefold() backend_subpath = os.path.join("sam.ldb.d", backend_filename) backend_path = self.lp.private_path(backend_subpath) self._test_full_db_lock1(backend_path) def _test_full_db_lock2(self, backend_path): (r1, w1) = os.pipe() (r2, w2) = os.pipe() pid = os.fork() if pid == 0: # In the child, close the main DB, re-open del(self.samdb) gc.collect() self.samdb = SamDB(session_info=self.session, credentials=self.creds, lp=self.lp) # We need to hold this iterator open to hold the all-record lock. res = self.samdb.search_iterator() os.write(w2, b"start") if (os.read(r1, 7) != b"started"): os._exit(1) os.write(w2, b"add") if (os.read(r1, 5) != b"added"): os._exit(2) # Wait 2 seconds to block prepare_commit() in the child. os.write(w2, b"prepare") time.sleep(2) # Release the locks for l in res: pass if (os.read(r1, 8) != b"prepared"): os._exit(3) os._exit(0) # In the parent, close the main DB, re-open just one DB del(self.samdb) gc.collect() backenddb = ldb.Ldb(backend_path) # We can start the transaction during the search # because both just grab the all-record read lock. self.assertEqual(os.read(r2, 5), b"start") backenddb.transaction_start() os.write(w1, b"started") self.assertEqual(os.read(r2, 3), b"add") backenddb.add({"dn":"@DSDB_LOCK_TEST"}) backenddb.delete("@DSDB_LOCK_TEST") os.write(w1, b"added") # Obtain a write lock, this will block until # the child releases the read lock. self.assertEqual(os.read(r2, 7), b"prepare") start = time.time() backenddb.transaction_prepare_commit() end = time.time() try: self.assertGreater(end - start, 1.9) except: raise finally: os.write(w1, b"prepared") # Drop the write lock backenddb.transaction_cancel() (got_pid, status) = os.waitpid(pid, 0) self.assertEqual(got_pid, pid) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 0) def test_full_db_lock2(self): basedn = self.samdb.get_default_basedn() backend_filename = "%s.ldb" % basedn.get_casefold() backend_subpath = os.path.join("sam.ldb.d", backend_filename) backend_path = self.lp.private_path(backend_subpath) self._test_full_db_lock2(backend_path) def test_full_db_lock2_config(self): basedn = self.samdb.get_config_basedn() backend_filename = "%s.ldb" % basedn.get_casefold() backend_subpath = os.path.join("sam.ldb.d", backend_filename) backend_path = self.lp.private_path(backend_subpath) self._test_full_db_lock2(backend_path) def test_no_error_on_invalid_control(self): try: res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData"], controls=["local_oid:%s:0" % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED]) except ldb.LdbError as e: self.fail("Should have not raised an exception") def test_error_on_invalid_critical_control(self): try: res = self.samdb.search(expression="cn=Administrator", scope=ldb.SCOPE_SUBTREE, attrs=["replPropertyMetaData"], controls=["local_oid:%s:1" % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED]) except ldb.LdbError as e: if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION: self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION" % e[1])